lipermimodその5 PullStream

2018年5月29日

lipermimodその4の続きである。今回は、クライアント・サーバ間での大きなデータの転送を行う仕組みについて説明する。

例えば、大きなファイルの中身のデータをクライアントからサーバへ、サーバからクライアントへ転送したいとする。通常のJavaコードであれば、呼び出し側がオープンしたInputStreamを引数として何らかのメソッドを呼び出し、そのメソッドの中で読み出してもらうことが考えられる。

が、もちろんクライアントからサーバに対してInputStreamを送ることはできない。たとえ送れたとしても正常に動作はしない。サーバからクライアントの方向も全く同じである。

この問題を解決するために、便利な仕組みを用意してある。

PullStreamによるアップロード

その一つがPullStream、PullStreamSource、PullStreamDestinationである。
これはいわばInputStreamの代わりとなるもので、PullStreamを受け取った側はそこからデータを「引っ張る」ことになる。以下に例を示す。

クライアント側は、転送したいInputStreamをPullStreamSourceにラップしてサーバ側にわたす。もちろん、このオブジェクトはSerializableではないので、中身が転送されるわけではなく、PullStreamインターフェースのみが転送される。

PullStreamインターフェースを受け取ったサーバ側では、それをPullStreamDestinationにラップし、これをInputStreamとして読み出せばよい。


import static org.junit.Assert.*; import java.io.*; import java.util.*; import java.util.stream.*; import org.junit.*; import com.cm55.lipermimod.*; public class PullStreamTest { Server server; Client client; @Before public void before() { server = new Server(); client = new Client(); } @Test public void test() throws Exception { // ServerGlobalImplを作成して、サーバスタート ServerGlobalImpl serverGlobalImpl = new ServerGlobalImpl(); server.registerGlobal("serverGlobal", serverGlobalImpl); server.bind(4455); // クライアントがサーバに接続して、インターフェース取得 client.connect("localhost", 4455); ServerGlobal serverGlobal = client.getGlobal("serverGlobal"); // 適当なデータをアップロードする byte[]uploading = randomBytes(3333); serverGlobal.upload(new PullStreamSource(new ByteArrayInputStream(uploading))); // アップロードが終了するまでここには帰ってこない。 // アップロードデータが一致するか assertArrayEquals(uploading, serverGlobalImpl.uploaded); // サーバクローズ server.close(); } public interface ServerGlobal extends IRemote { public void upload(PullStream stream); } public static class ServerGlobalImpl implements ServerGlobal { byte[]uploaded; public void upload(PullStream stream) { try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); PullStreamDestination client = new PullStreamDestination(stream)) { copy(client, bytes); uploaded = bytes.toByteArray(); } catch (IOException ex) { ex.printStackTrace(); } } } static byte[] randomBytes(int size) { byte[]buffer = new byte[size]; Random r = new Random(); IntStream.range(0, size).forEach(i->buffer[i] = (byte)r.nextInt()); return buffer; } static void copy(InputStream in, OutputStream out) throws IOException { byte[]buf = new byte[4096]; while (true) { int size = in.read(buf); if (size <= 0) break; out.write(buf, 0, size); } } }

PullStreamによるダウンロード

PullStreamはサーバからのデータダウンロードにも使える。サーバからPullStreamを取得し、それを読み出せばよい。


import static org.junit.Assert.*; import java.io.*; import java.util.*; import java.util.stream.*; import org.junit.*; import com.cm55.lipermimod.*; public class PullStream3Test { Server server; Client client; @Before public void before() { server = new Server(); client = new Client(); } @Test public void test() throws Exception { // ServerGlobalImplを作成して、サーバスタート ServerGlobalImpl serverGlobalImpl = new ServerGlobalImpl(); server.registerGlobal("serverGlobal", serverGlobalImpl); server.bind(4455); // クライアントがサーバに接続して、インターフェース取得 client.connect("localhost", 4455); ServerGlobal serverGlobal = client.getGlobal("serverGlobal"); // データをダウンロードする byte[]downloaded; try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); InputStream in = new PullStreamDestination(serverGlobal.download())) { copy(in, bytes); downloaded = bytes.toByteArray(); } catch (IOException ex) { ex.printStackTrace(); return; } // アップロードデータが一致するか assertArrayEquals(downloaded, serverGlobalImpl.downloading); // サーバクローズ server.close(); } public interface ServerGlobal extends IRemote { public PullStream download(); } public static class ServerGlobalImpl implements ServerGlobal { byte[]downloading = randomBytes(3333); public PullStream download() { return new PullStreamSource(new ByteArrayInputStream(downloading)); } } static byte[] randomBytes(int size) { byte[]buffer = new byte[size]; Random r = new Random(); IntStream.range(0, size).forEach(i->buffer[i] = (byte)r.nextInt()); return buffer; } static void copy(InputStream in, OutputStream out) throws IOException { byte[]buf = new byte[4096]; while (true) { int size = in.read(buf); if (size <= 0) break; out.write(buf, 0, size); } } }