00001 <?php
00002
00003
00004
00005
00006
00007
00008
00009
00010
00022 class PEIP_Pipe
00023 extends PEIP_ABS_Reply_Producing_Message_Handler
00024 implements
00025 PEIP_INF_Channel,
00026 PEIP_INF_Subscribable_Channel,
00027 PEIP_INF_Connectable {
00028
00029 protected
00030 $eventDispatcher,
00031 $messageDispatcher,
00032 $name,
00033 $commands = array();
00034
00035 protected static
00036 $sharedEventDispatcher;
00037
00038
00044 public function setName($name){
00045 $this->name = $name;
00046 }
00047
00048
00053 public function getName(){
00054 return $this->name;
00055 }
00056
00057
00064 public function send(PEIP_INF_Message $message, $timeout = -1){
00065 return $this->handle($message);
00066 }
00067
00068
00074 protected function doSend(PEIP_INF_Message $message){
00075 $this->doFireEvent('prePublish', array('MESSAGE'=>$message));
00076 $this->getMessageDispatcher()->notify($message);
00077 $this->doFireEvent('postPublish', array('MESSAGE'=>$message));
00078 return true;
00079 }
00080
00081
00087 protected function replyMessage($content){
00088 $message = $this->ensureMessage($content);
00089 if($this->getOutputChannel()){
00090 $this->getOutputChannel()->send($message);
00091 }else{
00092 $this->doSend($message);
00093 }
00094 }
00095
00096
00102 protected function doReply(PEIP_INF_Message $message){
00103 $this->replyMessage($message);
00104 }
00105
00106
00112 public function subscribe(PEIP_INF_Handler $handler){
00113 $this->getMessageDispatcher()->connect($handler);
00114 $this->doFireEvent('subscribe', array('SUBSCRIBER'=>$handler));
00115 }
00116
00117
00123 public function unsubscribe(PEIP_INF_Handler $handler){
00124 $this->getMessageDispatcher()->disconnect($handler);
00125 $this->doFireEvent('unsubscribe', array('SUBSCRIBER'=>$handler));
00126 }
00127
00128
00135 public function setMessageDispatcher(PEIP_INF_Dispatcher $dispatcher, $transferListeners = true){
00136 if(isset($this->dispatcher) && $transferListeners){
00137 foreach($this->dispatcher->getListeners() as $listener){
00138 $dispatcher->connect($listener);
00139 $this->dispatcher->disconnect($listener);
00140 }
00141 }
00142 $this->dispatcher = $dispatcher;
00143 $this->doFireEvent('setMessageDispatcher', array('DISPATCHER'=>$dispatcher));
00144 }
00145
00146
00151 public function getMessageDispatcher(){
00152 return isset($this->dispatcher) ? $this->dispatcher : $this->dispatcher = new PEIP_Dispatcher;
00153 }
00154
00155
00162 public function connect($name, PEIP_INF_Handler $listener){
00163 $this->getEventDispatcher()->connect($name, $this, $listener);
00164 $this->doFireEvent('connect', array('EVENT'=>$name, 'LISTENER'=>$handler));
00165 }
00166
00167
00174 public function disconnect($name, PEIP_INF_Handler $listener){
00175 $this->getEventDispatcher()->disconnect($name, $this, $listener);
00176 $this->doFireEvent('disconnect', array('EVENT'=>$name, 'LISTENER'=>$handler));
00177 }
00178
00179
00185 public function hasListeners($name){
00186 return $this->getEventDispatcher()->hasListener($name, $this);
00187 }
00188
00189
00195 public function getListeners($name){
00196 return $this->getEventDispatcher()->hasListener($name, $this);
00197 }
00198
00199
00213 public function connectCall($eventName, $callable){
00214 $this->connect($eventName, new PEIP_Callable_Handler($callable));
00215 }
00216
00217
00231 public function disconnectCall($eventName, $callable){
00232 foreach($this->getEventDispatcher()->getListeners() as $handler){
00233 if($handler instanceof PEIP_Callable_Handler && $handler->getCallable() == $callable){
00234 $this->disconnect($eventName, $this, $handler);
00235 }
00236 }
00237 }
00238
00239
00246 protected function registerCommand($commandName, $callable){
00247 $this->commands[$commandName] = $callable;
00248 }
00249
00250
00256 public function command(PEIP_INF_Message $cmdMessage){
00257 $this->doFireEvent('preCommand', array('MESSAGE'=>$cmdMessage));
00258 $cmd = trim((string)$cmdMessage->getHeader('COMMAND'));
00259 if($cmd != '' && array_key_exists($cmd, $this->commands)){
00260 call_user_func($this->commands[$commandName], $cmdMessage->getContent());
00261 }
00262 $this->doFireEvent('postCommand', array('MESSAGE'=>$cmdMessage));
00263 }
00264
00265
00272 public function setEventDispatcher(PEIP_Object_Event_Dispatcher $dispatcher, $transferListners = true){
00273 if($transferListners){
00274 foreach($this->eventDispatcher->getEventNames($this) as $name){
00275 if($this->eventDispatcher->hasListeners($name, $this)){
00276 foreach($this->eventDispatcher->getListeners($name, $this) as $listener){
00277 $dispatcher->connect($name, $this, $listener);
00278 }
00279 }
00280 }
00281 }
00282 $this->eventDispatcher = $dispatcher;
00283 $this->doFireEvent('setEventDispatcher', array('DISPATCHER'=>$dispatcher));
00284 }
00285
00286
00291 public function getEventDispatcher(){
00292 return $this->eventDispatcher ? $this->eventDispatcher : $this->eventDispatcher = self::getSharedEventDispatcher();
00293 }
00294
00295 protected static function getSharedEventDispatcher(){
00296 return self::$sharedEventDispatcher ? self::$sharedEventDispatcher : self::$sharedEventDispatcher = new PEIP_Object_Event_Dispatcher;
00297 }
00298
00299
00307 protected function doFireEvent($name, array $headers = array(), $eventClass = false){
00308 return $this->getEventDispatcher()->buildAndNotify($name, $this, $headers, $eventClass);
00309 }
00310
00311 }