2013-01-21—–2013-01-27
技术:
- 本周技术上最关心的倒是lucene4.1 release了,其中几个主要的特性是:
2013-01-21—–2013-01-27
技术:
去年的时候,往美股账户里打了2100$。当时玩美股的一个思路就是:不是XX公司的员工,但我可以是其股东,哈哈。
开始时关注的股票有360, facebook。但可惜的是当它们在20以下时,委托了后没有买进来。后续只能看着他们一路往上涨。对此自己的总结是:执行不到位,不按既定目标执行是最大败笔。当facebook在19以下时,自己觉得是可以入手(唯一的判断标准就是股价快腰斩了),但在实际操作时,总希望买的更低一点,委托价偏低。
在这个月,入手了yy,价格为13.6。
这是对自己之前在工作中一实现和EventBus 实现的差距分析
业务场景:
在系统中经常会有一个component产生一份数据,在完成自己的业务时,还需要交给另外的component进行一些额外的处理。这其实已经是publish-subscribe-style communication。
在love中类似的场景是有:
1) 用户发表了一个宝贝,在完成所有发表相关操作后,还需要实时分析检测模块来确定当前发表的宝贝是否需要在前台展现,如果没有这模块的话,发表的推广宝贝在增量更新后马上在前台最近列表下展现出来。
检测分为两种,用户分享的宝贝是不是大部分是来自同一店铺 ,分享宝贝的理由是不是广告信息。
2)用户对宝贝,专辑进行评论。在完成评论后,也需要一个模块来分析当前评论是否是垃圾评论,如果是的话,直接将这评论实时删除。
应该说还有其它类似的业务需求都可以归类为publish-subscribe。
做publish-subscribe这样的体系结构可以分为两个层面:
1)系统之间的,这时我们可以采用notify来完成
2)系统内component之间的, 可以自己通过队列来完成
在有些场景下,我们可以把系统间的publish-subscribe弱化成系统内component之间的。如场景1),2),我们可以另外搭建一个应用,通过notify来传递信息,辅以一个中心存储点,也可以直接在单台jvm内完成。
ugly的实现:
思路:利用queue来交换数据。
交换容器:
/**
* size在写入时控制,防止oom
*/
ConcurrentLinkedQueue<InfoDO> infoQueue = new ConcurrentLinkedQueue<InfoDO>();
添加入口(publish):
public boolean add(InfoDO infoDO) {
……. info本身事先分析,过滤
result = infoQueue.add(copyInfo);
return result;
}
consume机制:
起一个线程,不停地扫queue中的数据
class AnalysisThread extends Thread {
@Override
public void run() {
while (true) {
…………………….
if (infoQueue.isEmpty()) {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
logger.warn(“AnalysisContainer-Exception-info:{}”, e);
}
continue;
}
InfoDO info = infoQueue.poll();
if (info != null) {
try {
sameShopAnalysis.analysis(info);
} catch (Throwable t) {
logger.warn(“AnalysisContainer-analysis-info:{}”, t);
}
………………
}
这里的sameShopAnalysis是作为一个property注入进来的,
当有其它类型数据需要分析时,需要注入新的bean,并且overwrite add()接口,同时需要修改while(true)中的代码。
所有这些为的都是这个异步线程能及时描述到新增的数据,同时拿出来后交给对应compoent中处理此业务的bean。
优点:
在一定程度上已经隔离了两个component之间的相互感知,只需要通过所有publish端只需要调用 AnalysisContainer 的add接口。
存在的改进点:
这个Analysiscontainer没有共用性,每次新增数据类型时,得改动这里的代码。这个只能是最简单版本的publish-subscribe。
改进思路:
需要做的是把这个dispatch统一掉,形成一个公共的类,对数据类型,subscribe(consumer)透明。
EventBus是如何设计的:
The EventBus system and code use the following terms to discuss event distribution:
Event (类似于AnalysisContainer) | Any object that may be posted to a bus. |
Subscribing(类似于property中各个初始化的Analysis进行赋值) | The act of registering a listener with an EventBus, so that its handler methods will receive events. |
Listener(类似于sameShopAnalysis,各个分析bean) | An object that wishes to receive events, by exposing handler methods. |
Handler method(类似于analysis接口中的analysis()方法) | A public method that the EventBus should use to deliver posted events. Handler methods are marked by the Subscribe annotation. |
Posting an event(类似于AnalysisContainer中的add()方法) | Making the event available to any listeners through the EventBus. |
因为在AnalysisContainer中,是通过hard code将Event和Listener关联在一起的,如果要想去掉hard code,就需要定义一定的规则,并且利用通过的技术手段关联起来。
Listener实现:
1) 突破了 Analysis的接口实现约定(因为是硬编码,其实可以不遵守接口实现),可以是任何bean,但需要通过 @Subscribe 这个annotation来指明想让bean中哪个方法来receive Event。
Suscribing:
1)在目前的实现中,是通过property new一个新值 ,赋值操作完成。
在EventBus中,通过register()方法来完成。但在这register过程中,会分析这个bean中哪些方法使用了@Subscribe , 如果使用了,则提取出这个方法,并且分析第一个参数bean的 接口,类集合(the set of interfaces and classes that this type is or is a subtype of).
这是很关键的一个设计,当有Event 提交时,就是根据Event的 接口,类集合 来匹配这些参数的接口,类集合,再进一步找到对应 handler及方法。
系统中的改进:
因此可以拿掉系统中那ugly的AnalysisContainer代码,
替换成EventBus, 将每个analysis接口的实现类register进来,同时使用@Subscribe来指明handler method。
注意点:
因为分析还是相对比较耗时,
When post is called, all registered handlers for an event are run in sequence, so handlers should be reasonably quick. If an event may trigger an extended process (such as a database load), spawn a thread or queue it for later. (For a convenient way to do this, use an AsyncEventBus.)