lipermimodその6 PushStream

2019年5月21日

lipermimodその5の続きである。lipermimodについての全投稿は/tag/lipermimodにある。

PullStreamを作成し、それを一方に渡せば、それをInputStreamとしてそこからデータを得ることができる。これは「引っ張る」方式のデータ転送である。

これに対しPushStreamを使うと、これをOutputStreamとして扱い、「押し付ける」形式のデータ転送を行うことができる。

PushStreamによるダウンロードの例

以下はサーバのデータをクライアントにダウンロードする例である。クライアント側がPushStreamを作成し、それをサーバに受け渡す。サーバ側では、これをOutputStreamとしてデータを出力する。すると、クライアント側にそのデータが渡されることになる。


import static org.junit.Assert.*; import java.io.*; import java.util.*; import java.util.stream.*; import org.junit.*; import com.cm55.lipermimod.*; public class PushStream1Test { Server server; Client client; @Before public void before() { server = new Server(); client = new Client(); } @Test public void test() throws Exception { ServerGlobalImpl serverGlobalImpl = new ServerGlobalImpl(); server.registerGlobal("serverGlobal", serverGlobalImpl); server.bind(4455); client.connect("localhost", 4455); ServerGlobal serverGlobal = client.getGlobal("serverGlobal"); // ダウンロードを行う。 ByteArrayOutputStream downloaded = new ByteArrayOutputStream(); serverGlobal.download(new PushStreamDestination(downloaded)); // ダウンロードが終了するまでここには帰ってこない。 // ダウンロードデータが一致するか assertArrayEquals(downloaded.toByteArray(), serverGlobalImpl.downloading); // サーバクローズ server.close(); } public interface ServerGlobal extends IRemote { public void download(PushStream stream) throws IOException; } public static class ServerGlobalImpl implements ServerGlobal { byte[]downloading = randomBytes(13333); public void download(PushStream stream) throws IOException { try (PushStreamSource output = new PushStreamSource(stream)) { output.write(downloading); } } } static byte[] randomBytes(int size) { byte[]buffer = new byte[size]; Random r = new Random(); IntStream.range(0, size).forEach(i->buffer[i] = (byte)r.nextInt()); return buffer; } }

PushStreamによるアップロードの例

以下は、逆方向、PushStreamを使ってクライアントデータをサーバにアップロードする例である。
クライアントは、サーバから取得したPushStreamに対して自らのデータを「押し込む」。


import static org.junit.Assert.*; import java.io.*; import java.util.*; import java.util.stream.*; import org.junit.*; import com.cm55.lipermimod.*; public class PushStream2Test { Server server; Client client; @Before public void before() { server = new Server(); client = new Client(); } @Test public void test() throws Exception { ServerGlobalImpl serverGlobalImpl = new ServerGlobalImpl(); server.registerGlobal("serverGlobal", serverGlobalImpl); server.bind(4455); client.connect("localhost", 4455); ServerGlobal serverGlobal = client.getGlobal("serverGlobal"); // アップロードを行う。 byte[]uploading = randomBytes(13333); try (OutputStream out = new PushStreamSource(serverGlobal.getForUpload())) { out.write(uploading); } // アップロードデータが一致するか assertArrayEquals(uploading, serverGlobalImpl.uploaded.toByteArray()); // サーバクローズ server.close(); } public interface ServerGlobal extends IRemote { public PushStream getForUpload() throws IOException; } public static class ServerGlobalImpl implements ServerGlobal { ByteArrayOutputStream uploaded = new ByteArrayOutputStream(); public PushStream getForUpload() throws IOException { return new PushStreamDestination(uploaded); } } static byte[] randomBytes(int size) { byte[]buffer = new byte[size]; Random r = new Random(); IntStream.range(0, size).forEach(i->buffer[i] = (byte)r.nextInt()); return buffer; } }