微信已经开源了mars,但是市面上相关的文章较少,即使有也是多在于使用xlog等这些,那么这次我希望能够从stn这个直接用于im底层通讯的部分进行个分析。
为了能分析的全面些,我们从samples开始。首先明确下,微信用了google的开源协议protobuf库,来代替json和xml。至于为何使用这个,原因还在于效率和传输量上,效率上他能够比json提升将近10倍,而且基于二进制而非文本,传输的大小更加有优势,具体的不再累述,有兴趣的可以自己查查。我们从samples开始看看通过http是怎么获得列表数据的,直接看/mars-master/samples/android/marsSampleChat/app/src/main/java/com/tencent/mars/sample/ConversationActivity.java,这个是个初始的列表界面,需要看的就是这个:/** * pull conversation list from server */ private void updateConversationTopics() { if (taskGetConvList != null) { MarsServiceProxy.cancel(taskGetConvList); } mTextView.setVisibility(View.INVISIBLE); progressBar.setVisibility(View.VISIBLE); swipeRefreshLayout.setRefreshing(true); taskGetConvList = new NanoMarsTaskWrapper( new Main.ConversationListRequest(), new Main.ConversationListResponse() ) { private List dataList = new LinkedList<>(); @Override public void onPreEncode(Main.ConversationListRequest req) { req.type = conversationFilterType; req.accessToken = ""; // TODO: Log.d("xxx", "onPreEncode: " + req.toString()); } @Override public void onPostDecode(Main.ConversationListResponse response) { Log.d("xxx", "onPostDecode: " + response.toString()); } @Override public void onTaskEnd(int errType, int errCode) { Log.d("xxx", "onTaskEnd: " + errType + " " + errCode); runOnUiThread(new Runnable() { @Override public void run() { if (response != null) { for (Main.Conversation conv : response.list) { dataList.add(new Conversation(conv.name, conv.topic, conv.notice)); Log.d("xxx", conv.toString()); } } if (!dataList.isEmpty()) { progressBar.setVisibility(View.INVISIBLE); conversationListAdapter.list.clear(); conversationListAdapter.list.addAll(dataList); conversationListAdapter.notifyDataSetChanged(); swipeRefreshLayout.setRefreshing(false); } else { Log.i(TAG, "getconvlist: empty response list"); progressBar.setVisibility(View.INVISIBLE); mTextView.setVisibility(View.VISIBLE); } } }); } }; MarsServiceProxy.send(taskGetConvList.setHttpRequest(CONVERSATION_HOST, "/mars/getconvlist")); }
new了一个NanoMarsTaskWrapper对象,并Override了几个方法:onPreEncode、onPostDecode、onTaskEnd。分别是编码传输前回调,收到结果解码后回调,任务结束后回调;
设置NanoMarsTaskWrapper对象的http url地址;
通过MarsServiceProxy的send方法,执行发送;
通过这些,我们可以大体了解到,通过一个内置的任务体系,来进行传输的派发调用的;通过服务来驱使整个体系运转,并保证独立性;
其实在目录中已经可以看到了,samples分为2个部分,一个是app,另一个是wrapper,wrapper是jar。
好吧,我们从wrapper入手看下基本结构。首先是manifest:可以看到,独立进程的服务在这里约定了。广播接受者在这里约定了,与服务在同一进程中。
上面app中使用的MarsServiceProxy是个什么东西呢?public class MarsServiceProxy implements ServiceConnection { ...... private MarsServiceProxy() { worker = new Worker(); worker.start(); } public static void init(Context context, Looper looper, String packageName) { if (inst != null) { // TODO: Already initialized return; } gContext = context.getApplicationContext(); gPackageName = (packageName == null ? context.getPackageName() : packageName); gClassName = SERVICE_DEFUALT_CLASSNAME; inst = new MarsServiceProxy(); } ...... }
其实是从ServiceConnection继承下来的服务连接对象,但是他不仅仅是个连接对象。我们看到,他是个单例,在app的SampleApplicaton的onCreate中进行的初始化:
// NOTE: MarsServiceProxy is for client/caller // Initialize MarsServiceProxy for local client, can be moved to other place MarsServiceProxy.init(this, getMainLooper(), null);
app中调用的是send这个静态方法:
public static void send(MarsTaskWrapper marsTaskWrapper) { inst.queue.offer(marsTaskWrapper); }
其实这个方法在操作的是队列LinkedBlockingQueue<MarsTaskWrapper>。看到了吧,这个MarsServiceProxy其实是个api代理,内部有缓存的任务队列,实际上send就是向这个线程安全的队列中加入一项任务MarsTaskWrapper。
暂时放一下,我们关注下他的服务功能。在构造的时候,new了一个Worker,并start了。这个worker就是一个线程:private static class Worker extends Thread { @Override public void run() { while (true) { inst.continueProcessTaskWrappers(); try { Thread.sleep(50); } catch (InterruptedException e) { // } } } }
也就是说,在这个类创建的时候,同时创建了一个工作线程,不断的以间隔50ms循环调用continueProcessTaskWrappers。再看continueProcessTaskWrappers:
private void continueProcessTaskWrappers() { try { if (service == null) { Log.d(TAG, "try to bind remote mars service, packageName: %s, className: %s", gPackageName, gClassName); Intent i = new Intent().setClassName(gPackageName, gClassName); gContext.startService(i); if (!gContext.bindService(i, inst, Service.BIND_AUTO_CREATE)) { Log.e(TAG, "remote mars service bind failed"); } // Waiting for service connected return; } MarsTaskWrapper taskWrapper = queue.take(); if (taskWrapper == null) { // Stop, no more task return; } try { Log.d(TAG, "sending task = %s", taskWrapper); final String cgiPath = taskWrapper.getProperties().getString(MarsTaskProperty.OPTIONS_CGI_PATH); final Integer globalCmdID = GLOBAL_CMD_ID_MAP.get(cgiPath); if (globalCmdID != null) { taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_CMD_ID, globalCmdID); Log.i(TAG, "overwrite cmdID with global cmdID Map: %s -> %d", cgiPath, globalCmdID); } service.send(taskWrapper, taskWrapper.getProperties()); } catch (Exception e) { // RemoteExceptionHandler e.printStackTrace(); } } catch (Exception e) { } }
1.检查服务是否启动,没有则启动并返回等待下一个50ms再继续;
2.从队列中获取一个任务,并给他分配一个cmdID,然后调用MarsService的send方法执行真正的发送事件。其实从上面看,这个服务代理就是做了这些事情,更深入的事情其实是交给了具体的服务进程来做的。这里就是个代理api。好的,我们往下看具体的服务。
首先MarsService是个aidl的定义,不过我们从上面的这个线程循环里就可以看到,启动的服务是根据Intent i = new Intent().setClassName(gPackageName, gClassName);启动的,这个gClassName = SERVICE_DEFUALT_CLASSNAME;就是public static final String SERVICE_DEFUALT_CLASSNAME = "com.tencent.mars.sample.wrapper.service.MarsServiceNative";看到了吧,就是MarsServiceNative。现在起进入到服务里面。public class MarsServiceNative extends Service implements MarsService { private static final String TAG = "Mars.Sample.MarsServiceNative"; private MarsServiceStub stub; ......}
这里保存了一个MarsServiceStub,后面的send都是调用他来实现的,现在暂时先放下send,看下onCreate:
@Override public void onCreate() { super.onCreate(); final MarsServiceProfile profile = gFactory.createMarsServiceProfile(); stub = new MarsServiceStub(this, profile); // set callback AppLogic.setCallBack(stub); StnLogic.setCallBack(stub); SdtLogic.setCallBack(stub); // Initialize the Mars PlatformComm Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper())); // Initialize the Mars StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts()); StnLogic.setShortlinkSvrAddr(profile.shortLinkPort()); StnLogic.setClientVersion(profile.productID()); Mars.onCreate(true); StnLogic.makesureLongLinkConnected(); // Log.d(TAG, "mars service native created"); }
1.创建配置信息类MarsServiceProfile;
2.new出MarsServiceStub来;3.设置各种回调;4.初始化Mars;5.Mars.onCreate(true);6.StnLogic.makesureLongLinkConnected();确认长连接。这里开始用到了Mars了,这个才是核心,并且不在这个工程中。核心的部分我们先放下,下一篇再深入分析。回到MarsServiceStub,看他的send方法:@Override public void send(final MarsTaskWrapper taskWrapper, Bundle taskProperties) throws RemoteException { final StnLogic.Task _task = new StnLogic.Task(StnLogic.Task.EShort, 0, "", null); // Set host & cgi path final String host = taskProperties.getString(MarsTaskProperty.OPTIONS_HOST); final String cgiPath = taskProperties.getString(MarsTaskProperty.OPTIONS_CGI_PATH); _task.shortLinkHostList = new ArrayList<>(); _task.shortLinkHostList.add(host); _task.cgi = cgiPath; final boolean shortSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, true); final boolean longSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, false); if (shortSupport && longSupport) { _task.channelSelect = StnLogic.Task.EBoth; } else if (shortSupport) { _task.channelSelect = StnLogic.Task.EShort; } else if (longSupport) { _task.channelSelect = StnLogic.Task.ELong; } else { Log.e(TAG, "invalid channel strategy"); throw new RemoteException("Invalid Channel Strategy"); } // Set cmdID if necessary int cmdID = taskProperties.getInt(MarsTaskProperty.OPTIONS_CMD_ID, -1); if (cmdID != -1) { _task.cmdID = cmdID; } TASK_ID_TO_WRAPPER.put(_task.taskID, taskWrapper); WRAPPER_TO_TASK_ID.put(taskWrapper, _task.taskID); // Send Log.i(TAG, "now start task with id %d", _task.taskID); StnLogic.startTask(_task); if (StnLogic.hasTask(_task.taskID)) { Log.i(TAG, "stn task started with id %d", _task.taskID); } else { Log.e(TAG, "stn task start failed with id %d", _task.taskID); } }
1.new一个StnLogic.Task;
2.设置task的参数,根据入口的Bundle;3.2个map保存taskID与task的关系;4.StnLogic.startTask(_task);启动任务执行;这里的内容又深入到了Mars核心里,可以看到,关键的处理都是在Mars核心部分完成的,这里的内容甭管是服务还是什么都是在做参数的传递及关系的维护等工作。好吧,我们倒带回来,回到MarsServiceStub,他实现了StnLogic.ICallBack这个interface。定义在mars里:
public interface ICallBack { /** * SDK要求上层做认证操作(可能新发起一个AUTH CGI) * @return */ boolean makesureAuthed(); /** * SDK要求上层做域名解析.上层可以实现传统DNS解析,或者自己实现的域名/IP映射 * @param host * @return */ String[] onNewDns(final String host); /** * 收到SVR PUSH下来的消息 * @param cmdid * @param data */ void onPush(final int cmdid, final byte[] data); /** * SDK要求上层对TASK组包 * @param taskID 任务标识 * @param userContext * @param reqBuffer 组包的BUFFER * @param errCode 组包的错误码 * @return */ boolean req2Buf(final int taskID, Object userContext, ByteArrayOutputStream reqBuffer, int[] errCode, int channelSelect); /** * SDK要求上层对TASK解包 * @param taskID 任务标识 * @param userContext * @param respBuffer 要解包的BUFFER * @param errCode 解包的错误码 * @return int */ int buf2Resp(final int taskID, Object userContext, final byte[] respBuffer, int[] errCode, int channelSelect); /** * 任务结束回调 * @param taskID 任务标识 * @param userContext * @param errType 错误类型 * @param errCode 错误码 * @return */ int onTaskEnd(final int taskID, Object userContext, final int errType, final int errCode); /** * 流量统计 * @param send * @param recv */ void trafficData(final int send, final int recv); /** * 连接状态通知 * @param status 综合状态,即长连+短连的状态 * @param longlinkstatus 仅长连的状态 */ void reportConnectInfo(int status, int longlinkstatus); /** * SDK要求上层生成长链接数据校验包,在长链接连接上之后使用,用于验证SVR身份 * @param identifyReqBuf 校验包数据内容 * @param hashCodeBuffer 校验包的HASH * @param reqRespCmdID 数据校验的CMD ID * @return ECHECK_NOW(需要校验), ECHECK_NEVER(不校验), ECHECK_NEXT(下一次再询问) */ int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID); /** * SDK要求上层解连接校验回包. * @param buffer SVR回复的连接校验包 * @param hashCodeBuffer CLIENT请求的连接校验包的HASH值 * @return */ boolean onLongLinkIdentifyResp(final byte[] buffer, final byte[] hashCodeBuffer); /** * 请求做sync */ void requestDoSync(); String[] requestNetCheckShortLinkHosts(); /** * 是否登录 * @return true 登录 false 未登录 */ boolean isLogoned(); void reportTaskProfile(String taskString); }
可以看到都是回调,通过mars的回调,MarsServiceStub接收到了taskend,并执行了:
@Override public int onTaskEnd(int taskID, Object userContext, int errType, int errCode) { final MarsTaskWrapper wrapper = TASK_ID_TO_WRAPPER.remove(taskID); if (wrapper == null) { Log.w(TAG, "stn task onTaskEnd callback may fail, null wrapper, taskID=%d", taskID); return 0; // TODO: ??? } try { wrapper.onTaskEnd(errType, errCode); } catch (RemoteException e) { e.printStackTrace(); } finally { WRAPPER_TO_TASK_ID.remove(wrapper); // onTaskEnd will be called only once for each task } return 0; }
从map中移除task,然后执行了task自己的onTaskEnd。这样我们正最初的updateConversationTopics里就可以看到后续的更新ui的代码。
下面我们要回到updateConversationTopics附近,看看NanoMarsTaskWrapper:
public abstract class NanoMarsTaskWrapperextends AbstractTaskWrapper {private static final String TAG = "Mars.Sample.NanoMarsTaskWrapper";protected T request;protected R response;public NanoMarsTaskWrapper(T req, R resp) { super(); this.request = req; this.response = resp;}@Overridepublic byte[] req2buf() { try { onPreEncode(request); final byte[] flatArray = new byte[request.getSerializedSize()]; final CodedOutputByteBufferNano output = CodedOutputByteBufferNano.newInstance(flatArray); request.writeTo(output); Log.d(TAG, "encoded request to buffer, [%s]", MemoryDump.dumpHex(flatArray)); return flatArray; } catch (Exception e) { e.printStackTrace(); } return new byte[0];}@Overridepublic int buf2resp(byte[] buf) { try { Log.d(TAG, "decode response buffer, [%s]", MemoryDump.dumpHex(buf)); response = MessageNano.mergeFrom(response, buf); onPostDecode(response); return StnLogic.RESP_FAIL_HANDLE_NORMAL; } catch (Exception e) { Log.e(TAG, "%s", e); } return StnLogic.RESP_FAIL_HANDLE_TASK_END;}public abstract void onPreEncode(T request);public abstract void onPostDecode(R response);
}
1.从AbstractTaskWrapper继承下来;
2.保存了request和response,都是MessageNano类型的(google的protobuf内的message数据类);3.实现了2个接口,分别用来作为request转换为buf何buf转换成为response。其实就是对象转成byte[],byte转成对象;3.在req2buf转换的过程中,调用了request的writeTo方法;4.在buf2resp中,调用了MessageNano.mergeFrom,实际上最终也是调用了response的mergeFrom,见下:/** * Parse {@code data} as a message of this type and merge it with the * message being built. */public static finalT mergeFrom(T msg, final byte[] data) throws InvalidProtocolBufferNanoException { return mergeFrom(msg, data, 0, data.length);}
根据上面的4点可以看到这是个实现序列化及反序列化的过程。google的开源protobuf我们不去关注,但是需要了解的是他是通过以proto为后缀名的配置文件来达到编译时即可生成类的相关代码的程度。
那么这个AbstractTaskWrapper的基类的作用又是什么呢?public abstract class AbstractTaskWrapper extends MarsTaskWrapper.Stub {private Bundle properties = new Bundle();public AbstractTaskWrapper() { // Reflects task properties final TaskProperty taskProperty = this.getClass().getAnnotation(TaskProperty.class); if (taskProperty != null) { setHttpRequest(taskProperty.host(), taskProperty.path()); setShortChannelSupport(taskProperty.shortChannelSupport()); setLongChannelSupport(taskProperty.longChannelSupport()); setCmdID(taskProperty.cmdID()); }}@Overridepublic Bundle getProperties() { return properties;}@Overridepublic abstract void onTaskEnd(int errType, int errCode);public AbstractTaskWrapper setHttpRequest(String host, String path) { properties.putString(MarsTaskProperty.OPTIONS_HOST, ("".equals(host) ? null : host)); properties.putString(MarsTaskProperty.OPTIONS_CGI_PATH, path); return this;}public AbstractTaskWrapper setShortChannelSupport(boolean support) { properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, support); return this;}public AbstractTaskWrapper setLongChannelSupport(boolean support) { properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, support); return this;}public AbstractTaskWrapper setCmdID(int cmdID) { properties.putInt(MarsTaskProperty.OPTIONS_CMD_ID, cmdID); return this;}@Overridepublic String toString() { return "AbsMarsTask: " + BundleFormat.toString(properties);}
}
很简单,就是提供了一些接口来设置传输协议类型,长短连接、http等。
综合来说,这个demo使用了独立的服务框架来进行传输的保证;使用了任务体系来承载每次传输及响应;大量的回调来监控运转过程中的各项关键点;封装了独立的jar wrapper,便于上层的更改及使用;独立的配置类引入支持http和tcp长短连接的使用;protobuf的引入极大提升序列化及反序列化的效率,并降低传输的数据大小;
这篇暂时就到这里吧,后面我们会深入分析下mars的核心部分。