lipermimodその5 PullStream
lipermimodその4の続きである。lipermimodについての全投稿は/tag/lipermimodにある。
今回は、クライアント・サーバ間での大きなデータの転送を行う仕組みについて説明する。
例えば、大きなファイルの中身のデータをクライアントからサーバへ、サーバからクライアントへ転送したいとする。通常のJavaコードであれば、呼び出し側がオープンしたInputStreamを引数として何らかのメソッドを呼び出し、そのメソッドの中で読み出してもらうことが考えられる。
が、もちろんクライアントからサーバに対してInputStreamを送ることはできない。たとえ送れたとしても正常に動作はしない。サーバからクライアントの方向も全く同じである。
この問題を解決するために、便利な仕組みを用意してある。
PullStreamによるアップロード
その一つがPullStream、PullStreamSource、PullStreamDestinationである。
これはいわばInputStreamの代わりとなるもので、PullStreamを受け取った側はそこからデータを「引っ張る」ことになる。以下に例を示す。
クライアント側は、転送したいInputStreamをPullStreamSourceにラップしてサーバ側にわたす。もちろん、このオブジェクトはSerializableではないので、中身が転送されるわけではなく、PullStreamインターフェースのみが転送される。
PullStreamインターフェースを受け取ったサーバ側では、それをPullStreamDestinationにラップし、これをInputStreamとして読み出せばよい。
import static org.junit.Assert.*;
import java.io.*;
import java.util.*;
import java.util.stream.*;
import org.junit.*;
import com.cm55.lipermimod.*;
public class PullStreamTest {
Server server;
Client client;
@Before
public void before() {
server = new Server();
client = new Client();
}
@Test
public void test() throws Exception {
// ServerGlobalImplを作成して、サーバスタート
ServerGlobalImpl serverGlobalImpl = new ServerGlobalImpl();
server.registerGlobal("serverGlobal", serverGlobalImpl);
server.bind(4455);
// クライアントがサーバに接続して、インターフェース取得
client.connect("localhost", 4455);
ServerGlobal serverGlobal = client.getGlobal("serverGlobal");
// 適当なデータをアップロードする
byte[]uploading = randomBytes(3333);
serverGlobal.upload(new PullStreamSource(new ByteArrayInputStream(uploading)));
// アップロードが終了するまでここには帰ってこない。
// アップロードデータが一致するか
assertArrayEquals(uploading, serverGlobalImpl.uploaded);
// サーバクローズ
server.close();
}
public interface ServerGlobal extends IRemote {
public void upload(PullStream stream);
}
public static class ServerGlobalImpl implements ServerGlobal {
byte[]uploaded;
public void upload(PullStream stream) {
try (ByteArrayOutputStream bytes = new ByteArrayOutputStream();
PullStreamDestination client = new PullStreamDestination(stream)) {
copy(client, bytes);
uploaded = bytes.toByteArray();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
static byte[] randomBytes(int size) {
byte[]buffer = new byte[size];
Random r = new Random();
IntStream.range(0, size).forEach(i->buffer[i] = (byte)r.nextInt());
return buffer;
}
static void copy(InputStream in, OutputStream out) throws IOException {
byte[]buf = new byte[4096];
while (true) {
int size = in.read(buf);
if (size <= 0) break;
out.write(buf, 0, size);
}
}
}
PullStreamによるダウンロード
PullStreamはサーバからのデータダウンロードにも使える。サーバからPullStreamを取得し、それを読み出せばよい。
import static org.junit.Assert.*;
import java.io.*;
import java.util.*;
import java.util.stream.*;
import org.junit.*;
import com.cm55.lipermimod.*;
public class PullStream3Test {
Server server;
Client client;
@Before
public void before() {
server = new Server();
client = new Client();
}
@Test
public void test() throws Exception {
// ServerGlobalImplを作成して、サーバスタート
ServerGlobalImpl serverGlobalImpl = new ServerGlobalImpl();
server.registerGlobal("serverGlobal", serverGlobalImpl);
server.bind(4455);
// クライアントがサーバに接続して、インターフェース取得
client.connect("localhost", 4455);
ServerGlobal serverGlobal = client.getGlobal("serverGlobal");
// データをダウンロードする
byte[]downloaded;
try (ByteArrayOutputStream bytes = new ByteArrayOutputStream();
InputStream in = new PullStreamDestination(serverGlobal.download())) {
copy(in, bytes);
downloaded = bytes.toByteArray();
} catch (IOException ex) {
ex.printStackTrace();
return;
}
// アップロードデータが一致するか
assertArrayEquals(downloaded, serverGlobalImpl.downloading);
// サーバクローズ
server.close();
}
public interface ServerGlobal extends IRemote {
public PullStream download();
}
public static class ServerGlobalImpl implements ServerGlobal {
byte[]downloading = randomBytes(3333);
public PullStream download() {
return new PullStreamSource(new ByteArrayInputStream(downloading));
}
}
static byte[] randomBytes(int size) {
byte[]buffer = new byte[size];
Random r = new Random();
IntStream.range(0, size).forEach(i->buffer[i] = (byte)r.nextInt());
return buffer;
}
static void copy(InputStream in, OutputStream out) throws IOException {
byte[]buf = new byte[4096];
while (true) {
int size = in.read(buf);
if (size <= 0) break;
out.write(buf, 0, size);
}
}
}