RTコンポーネントを Jython で動かすでは、OpenRTM-aist-Python 付属のサンプルプログラムを修正して Jython RTC を作成しましたが、今度はよりポピュラーな方法、RTC Builder で作成した Python RTC コードを Jython 用に修正します。
OpenRTM_aist を Pythonらしく定義することで、修正量を減らすこともできたので、今回はスクリプトも使いません。
これだけのコードが生成されますが、まず idlcompile.* は使いません。削除してしまいます。
通常の Python RTC と違って、idlのコンパイルには idlj を使います。
% idlj -fall MyService.idl
% cd SimpleService % javac *.java 注:MyServicePOA.java の操作は、未チェックまたは安全ではありません。 注:詳細については、-Xlint:unchecked オプションを指定して再コンパイルしてください。
エラーが出る場合、環境変数 CLASSPATH を確認してください。
% echo $CLASSPATH
% export CLASSPATH=.:$CLASSPATH
生成されたソースを修正します。
まず第1行目、shebang行の
#!/usr/bin/env python
# Import RTM module import RTC import OpenRTM_aist
import jp.go.aist.rtm.RTC OpenRTM_aist = jp.go.aist.rtm.RTC OpenRTM_aist.__dict__['Properties'] = jp.go.aist.rtm.RTC.util.Properties OpenRTM_aist.__dict__['CorbaPort'] = jp.go.aist.rtm.RTC.port.CorbaPort
"language", "Python",
self._MyServicePort.registerProvider("myservice0", "SimpleService.MyService", self._myservice0)
return RTC.RTC_OK
return self.super__onInitialize()
コメントアウトされている MyServiceProvider::onRateChanged の定義と、MyServiceInit 関数の定義の間にふたつのクラス MyServiceNewFunc, MyServiceDeleteFunc の定義を追加します。
class MyServiceNewFunc(OpenRTM_aist.RtcNewFunc): def createRtc(self, manager): return MyServiceProvider(manager) class MyServiceDeleteFunc(OpenRTM_aist.RtcDeleteFunc): def deleteRtc(self, rtcBase): rtcBase = None
profile = OpenRTM_aist.Properties(defaults_str=myserviceprovider_spec)
manager.registerFactory(profile, MyServiceProvider, OpenRTM_aist.Delete)
def MyServiceProviderInit(manager): profile = OpenRTM_aist.Properties(myserviceprovider_spec) manager.registerFactory(profile, MyServiceNewFunc(), MyServiceDeleteFunc())
def MyModuleInit(manager):
class MyModuleInitProc(OpenRTM_aist.ModuleInitProc):
MyModuleInitProc クラス(修正後)
class MyModuleInitProc(OpenRTM_aist.ModuleInitProc): def myModuleInit(self, manager): MyServiceProviderInit(manager) # Create a component comp = manager.createComponent("MyServiceProvider")
mgr.setModuleInitProc(MyModuleInit)
mgr.setModuleInitProc(MyModuleInitProc())
MyServiceProvider.py の修正はここまでです。
もともとの2行
import CORBA, PortableServer import SimpleService, SimpleService__POA
import time from SimpleService import MyServicePOA
class MyService_i (SimpleService__POA.MyService):
class MyService_i (MyServicePOA):
class MyService_i (MyServicePOA): def __init__(self): self._echoList = [] self._valueList = [] self._myValue = 0 def echo(self, msg): self._echoList.append(msg) print "MyService::echo() was called." for i in range(10): print "Message: ", msg time.sleep(1) print "MyService::echo() was finished." return msg def get_echo_history(self): print "MyService::get_echo_history() was called." for i in range(len(self._echoList)): print repr(i) + ": " + self._echoList[i] return self._echoList def set_value(self, value): self._valueList.append(value) self._myValue = value print "MyService::set_value() was called." print "Current value: ", self._myValue def get_value(self): print "MyService::get_value() was called." print "Current value: ", self._myValue return float(self._myValue) def get_value_history(self): print "MyService::get_value_history() was called.",len(self._valueList) for i in range(len(self._valueList)): print repr(i) + ": " + repr(self._valueList[i]) return self._valueList
また、こちらにも直接コマンドとして呼ばれたときのためのメインコードがありますが、削除しても問題ありません。
Jython なのでコンパイルの必要はありません。
% jython MyServiceProvider.py
% chmod 755 MyServiceProvider.py % ./MyServiceProvider.py
MyServiceProvider 用のRTC.xmlをコピーして開き、
基本タブのモジュール名を MyServiceConsumer に、サービスポートタブの myservice0 インターフェースの方向を Required に変更します。
CorbaConsumer を使うために、import 節に一行追加します。
# Import RTM module import jp.go.aist.rtm.RTC OpenRTM_aist = jp.go.aist.rtm.RTC OpenRTM_aist.__dict__['Properties'] = jp.go.aist.rtm.RTC.util.Properties OpenRTM_aist.__dict__['CorbaPort'] = jp.go.aist.rtm.RTC.port.CorbaPort OpenRTM_aist.__dict__['CorbaConsumer'] = jp.go.aist.rtm.RTC.port.CorbaConsumer # この行を追加
# Import Service stub modules # <rtc-template block="consumer_import"> from SimpleService import MyService # この行を追加
self._myservice = OpenRTM_aist.CorbaConsumer(MyService)
onExecute の実装例です。
def onExecute(self, ec_id): print "onExecute" print "\n" print "Command list: " print " echo [msg] : echo message." print " set_value [value]: set value." print " get_value : get current value." print " get_echo_history : get input messsage history." print " get_value_history: get input value history." print "> ", args = str(sys.stdin.readline()) argv = string.split(args) argv[-1] = argv[-1].rstrip("\n") if argv[0] == "echo" and len(argv) > 1: retmsg = self._myservice._ptr().echo(argv[1]) print "echo() return: ", retmsg return self.super__onExecute(ec_id) if argv[0] == "set_value" and len(argv) > 1: val = float(argv[1]) self._myservice._ptr().set_value(val) print "Set remote value: ", val return self.super__onExecute(ec_id) if argv[0] == "get_value": retval = self._myservice._ptr().get_value() print "Current remote value: ", retval return self.super__onExecute(ec_id) if argv[0] == "get_echo_history": echo_history = self._myservice._ptr().get_echo_history() for i in range(len(echo_history)): print repr(i) + ": " + echo_history[i] return self.super__onExecute(ec_id) if argv[0] == "get_value_history": value_history = self._myservice._ptr().get_value_history() print value_history, len(value_history) for i in range(len(value_history)): print repr(i) + ": " + repr(value_history[i]) return self.super__onExecute(ec_id) print "Invalid command or argument(s)." return self.super__onExecute(ec_id)