1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.streams.impl.ProtonPublisherWrapperImpl; 12 13 14 import hunt.proton.amqp.transport.Source; 15 import hunt.proton.amqp.transport.Target; 16 import hunt.proton.message.Message; 17 import hunt.amqp.streams.Subscriber; 18 import hunt.amqp.streams.Subscription; 19 20 import hunt.amqp.ProtonReceiver; 21 import hunt.amqp.streams.Delivery; 22 import hunt.amqp.streams.ProtonPublisher; 23 import hunt.amqp.streams.impl.ProtonPublisherImpl; 24 25 class ProtonPublisherWrapperImpl : ProtonPublisher!Message { 26 27 private ProtonPublisherImpl _delegate; 28 29 this(ProtonPublisherImpl delegat) { 30 this._delegate = delegat; 31 } 32 33 34 public void subscribe(Subscriber!Message subscriber) { 35 _delegate.subscribe(new AmqpSubscriberWrapperImpl(subscriber)); 36 } 37 38 public bool isEmitOnConnectionEnd() { 39 return _delegate.isEmitOnConnectionEnd(); 40 } 41 42 public void setEmitOnConnectionEnd(bool emitOnConnectionEnd) { 43 _delegate.setEmitOnConnectionEnd(emitOnConnectionEnd); 44 } 45 46 // ================================================== 47 48 class AmqpSubscriberWrapperImpl : Subscriber!Delivery { 49 50 private Subscriber!Message delegateSub; 51 52 this(Subscriber!Message subscriber) { 53 this.delegateSub = subscriber; 54 } 55 56 57 public void onSubscribe(Subscription s) { 58 delegateSub.onSubscribe(s); 59 } 60 61 62 public void onNext(Delivery d) { 63 Message m = d.message(); 64 delegateSub.onNext(m); 65 d.accept(); 66 } 67 68 69 public void onError(Throwable t) { 70 delegateSub.onError(t); 71 } 72 73 74 public void onComplete() { 75 delegateSub.onComplete(); 76 } 77 } 78 79 public ProtonReceiver getLink() { 80 return _delegate.getLink(); 81 } 82 83 // ================================================== 84 85 86 public string getRemoteAddress() { 87 return _delegate.getRemoteAddress(); 88 } 89 90 91 public ProtonPublisher!Message setSource(Source source) { 92 _delegate.setSource(source); 93 return this; 94 } 95 96 97 public Source getSource() { 98 return _delegate.getSource(); 99 } 100 101 102 public ProtonPublisher!Message setTarget(Target target) { 103 _delegate.setTarget(target); 104 return this; 105 } 106 107 108 public Target getTarget() { 109 return _delegate.getTarget(); 110 } 111 112 113 public Source getRemoteSource() { 114 return _delegate.getRemoteSource(); 115 } 116 117 118 public Target getRemoteTarget() { 119 return _delegate.getRemoteTarget(); 120 } 121 }