1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.streams.impl.ProtonPublisherWrapperImpl;
12 
13 
14 import hunt.proton.amqp.transport.Source;
15 import hunt.proton.amqp.transport.Target;
16 import hunt.proton.message.Message;
17 import hunt.amqp.streams.Subscriber;
18 import hunt.amqp.streams.Subscription;
19 
20 import hunt.amqp.ProtonReceiver;
21 import hunt.amqp.streams.Delivery;
22 import hunt.amqp.streams.ProtonPublisher;
23 import hunt.amqp.streams.impl.ProtonPublisherImpl;
24 
25 class ProtonPublisherWrapperImpl : ProtonPublisher!Message {
26 
27   private ProtonPublisherImpl _delegate;
28 
29   this(ProtonPublisherImpl delegat) {
30     this._delegate = delegat;
31   }
32 
33   
34   public void subscribe(Subscriber!Message subscriber) {
35     _delegate.subscribe(new AmqpSubscriberWrapperImpl(subscriber));
36   }
37 
38   public bool isEmitOnConnectionEnd() {
39     return _delegate.isEmitOnConnectionEnd();
40   }
41 
42   public void setEmitOnConnectionEnd(bool emitOnConnectionEnd) {
43     _delegate.setEmitOnConnectionEnd(emitOnConnectionEnd);
44   }
45 
46   // ==================================================
47 
48   class AmqpSubscriberWrapperImpl : Subscriber!Delivery {
49 
50     private Subscriber!Message delegateSub;
51 
52     this(Subscriber!Message subscriber) {
53       this.delegateSub = subscriber;
54     }
55 
56     
57     public void onSubscribe(Subscription s) {
58       delegateSub.onSubscribe(s);
59     }
60 
61     
62     public void onNext(Delivery d) {
63       Message m = d.message();
64       delegateSub.onNext(m);
65       d.accept();
66     }
67 
68     
69     public void onError(Throwable t) {
70       delegateSub.onError(t);
71     }
72 
73     
74     public void onComplete() {
75       delegateSub.onComplete();
76     }
77   }
78 
79   public ProtonReceiver getLink() {
80     return _delegate.getLink();
81   }
82 
83   // ==================================================
84 
85   
86   public string getRemoteAddress() {
87     return _delegate.getRemoteAddress();
88   }
89 
90   
91   public ProtonPublisher!Message setSource(Source source) {
92     _delegate.setSource(source);
93     return this;
94   }
95 
96   
97   public Source getSource() {
98     return _delegate.getSource();
99   }
100 
101   
102   public ProtonPublisher!Message setTarget(Target target) {
103     _delegate.setTarget(target);
104     return this;
105   }
106 
107   
108   public Target getTarget() {
109     return _delegate.getTarget();
110   }
111 
112   
113   public Source getRemoteSource() {
114     return _delegate.getRemoteSource();
115   }
116 
117   
118   public Target getRemoteTarget() {
119     return _delegate.getRemoteTarget();
120   }
121 }