1 /*
2  * Copyright (c) 2017-2018 sel-project
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to deal
6  * in the Software without restriction, including without limitation the rights
7  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8  * copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in all
12  * copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20  * SOFTWARE.
21  *
22  */
23 /**
24  * Copyright: Copyright (c) 2017-2018 sel-project
25  * License: MIT
26  * Authors: Kripth
27  * Source: $(HTTP github.com/sel-project/sel-net/sel/net/modifiers.d, sel/net/modifiers.d)
28  */
29 module sel.net.modifiers;
30 
31 import std.bitmanip : _write = write, _read = read;
32 import std.conv : to;
33 import std.socket : Socket, Address;
34 import std.system : Endian;
35 import std.traits : isNumeric, isIntegral, Parameters;
36 import std.zlib : Compress, UnCompress;
37 
38 import sel.net.stream : Stream;
39 
40 abstract class ModifierStream : Stream {
41 
42 	public Stream stream;
43 
44 	public this(Stream stream) {
45 		super(stream.socket);
46 		this.stream = stream;
47 	}
48 
49 	public override ptrdiff_t send(ubyte[] buffer) {
50 		return this.stream.send(buffer);
51 	}
52 
53 	public override ubyte[] receive() {
54 		return this.stream.receive();
55 	}
56 
57 	public override pure nothrow @property @safe @nogc ptrdiff_t lastRecv() {
58 		return this.stream.lastRecv();
59 	}
60 
61 }
62 
63 class PaddedStream(size_t paddingIndex) : ModifierStream {
64 
65 	private ubyte[] padding;
66 
67 	public this(Stream stream, ubyte[] padding) {
68 		super(stream);
69 	}
70 
71 	public override ptrdiff_t send(ubyte[] payload) {
72 		static if(paddingIndex == 0) {
73 			payload = this.padding ~ payload;
74 		} else {
75 			payload = payload[0..paddingIndex] ~ this.padding ~ payload[paddingIndex..$];
76 		}
77 		return super.send(payload);
78 	}
79 
80 	public override ubyte[] receive() {
81 		ubyte[] payload = super.receive();
82 		if(payload.length >= this.padding.length + paddingIndex) {
83 			static if(paddingIndex == 0) {
84 				return payload[this.padding.length..$];
85 			} else {
86 				return payload[0..this.padding.length] ~ payload[this.padding.length..$];
87 			}
88 		}
89 		return [];
90 	}
91 
92 }
93 
94 class LengthPrefixedStream(T, Endian endianness=Endian.bigEndian) : ModifierStream if(isNumeric!T || (is(typeof(T.encode)) && isIntegral!(Parameters!(T.encode)[0]))) {
95 	
96 	static if(isNumeric!T) {
97 		enum requiredSize = T.sizeof;
98 	} else {
99 		enum requiredSize = 1;
100 	}
101 
102 	public size_t maxLength;
103 
104 	private ubyte[] next;
105 	private size_t nextLength = 0;
106 	
107 	public this(Stream stream, size_t maxLength=size_t.max) {
108 		super(stream);
109 		this.maxLength = maxLength;
110 	}
111 	
112 	/**
113 	 * Sends a buffer prefixing it with its length.
114 	 * Returns: the number of bytes sent
115 	 */
116 	public override ptrdiff_t send(ubyte[] payload) {
117 		static if(isNumeric!T) {
118 			immutable length = payload.length.to!T;
119 			payload = new ubyte[requiredSize] ~ payload;
120 			_write!(T, endianness)(payload, length, 0);
121 		} else {
122 			payload = T.encode(payload.length.to!(Parameters!(T.encode)[0])) ~ payload;
123 		}
124 		return super.send(payload);
125 	}
126 	
127 	/**
128 	 * Returns: an array of bytes as indicated by the length or an empty array on failure or when the indicated length exceeds the max length
129 	 */
130 	public override ubyte[] receive() {
131 		return this.receiveImpl();
132 	}
133 	
134 	private ubyte[] receiveImpl() {
135 		if(this.nextLength == 0) {
136 			// read length of the packet
137 			while(this.next.length < requiredSize) {
138 				if(!this.read()) return [];
139 			}
140 			static if(isNumeric!T) {
141 				this.nextLength = _read!(T, endianness)(this.next);
142 			} else {
143 				this.nextLength = T.fromBuffer(this.next);
144 			}
145 			if(this.nextLength == 0 || this.nextLength > this.maxLength) {
146 				// valid connection but unacceptable length
147 				this.nextLength = 0;
148 				return [];
149 			} else {
150 				return this.receiveImpl();
151 			}
152 		} else {
153 			// read the packet with the given length
154 			while(this.next.length < this.nextLength) {
155 				if(!this.read()) return [];
156 			}
157 			ubyte[] ret = this.next[0..this.nextLength];
158 			this.next = this.next[this.nextLength..$];
159 			this.nextLength = 0;
160 			return ret;
161 		}
162 	}
163 	
164 	/*
165 	 * Returns: true if some data has been received, false if the connection has been closed or timed out
166 	 */
167 	private bool read() {
168 		ubyte[] recv = super.receive();
169 		if(this.lastRecv > 0) {
170 			this.next ~= recv;
171 			return true;
172 		} else {
173 			return false;
174 		}
175 	}
176 	
177 }
178 
179 class CompressedStream(T) : ModifierStream {
180 	
181 	private immutable size_t thresold;
182 	
183 	public this(Stream stream, size_t thresold) {
184 		super(stream);
185 		this.thresold = thresold;
186 	}
187 	
188 	public override ptrdiff_t send(ubyte[] buffer) {
189 		if(buffer.length >= this.thresold) {
190 			auto compress = new Compress();
191 			auto data = compress.compress(buffer);
192 			data ~= compress.flush();
193 			buffer = T.encode(buffer.length.to!uint) ~ cast(ubyte[])data; //TODO more types
194 		} else {
195 			buffer = ubyte.init ~ buffer;
196 		}
197 		return super.send(buffer);
198 	}
199 	
200 	public override ubyte[] receive() {
201 		ubyte[] buffer = super.receive();
202 		uint length = T.fromBuffer(buffer);
203 		if(length != 0) {
204 			// compressed
205 			auto uncompress = new UnCompress(length);
206 			buffer = cast(ubyte[])uncompress.uncompress(buffer.dup);
207 			buffer ~= cast(ubyte[])uncompress.flush();
208 		}
209 		return buffer;
210 	}
211 	
212 }