RTコンポーネントを Jython で動かすでは、OpenRTM-aist-Python 付属のサンプルプログラムを修正して Jython RTC を作成しましたが、今度はよりポピュラーな方法、RTC Builder で作成した Python RTC コードを Jython 用に修正します。
OpenRTM_aist を Pythonらしく定義することで、修正量を減らすこともできたので、今回はスクリプトも使いません。
これだけのコードが生成されますが、まず idlcompile.* は使いません。削除してしまいます。
通常の Python RTC と違って、idlのコンパイルには idlj を使います。
% idlj -fall MyService.idl
% cd SimpleService % javac *.java 注:MyServicePOA.java の操作は、未チェックまたは安全ではありません。 注:詳細については、-Xlint:unchecked オプションを指定して再コンパイルしてください。
エラーが出る場合、環境変数 CLASSPATH を確認してください。
% echo $CLASSPATH
% export CLASSPATH=.:$CLASSPATH
生成されたソースを修正します。
まず第1行目、shebang行の
#!/usr/bin/env python
# Import RTM module import RTC import OpenRTM_aist
import jp.go.aist.rtm.RTC OpenRTM_aist = jp.go.aist.rtm.RTC OpenRTM_aist.__dict__['Properties'] = jp.go.aist.rtm.RTC.util.Properties OpenRTM_aist.__dict__['CorbaPort'] = jp.go.aist.rtm.RTC.port.CorbaPort
"language", "Python",
self._MyServicePort.registerProvider("myservice0", "SimpleService.MyService", self._myservice0)return RTC.RTC_OK
return self.super__onInitialize()
コメントアウトされている MyServiceProvider::onRateChanged の定義と、MyServiceInit 関数の定義の間にふたつのクラス MyServiceNewFunc, MyServiceDeleteFunc の定義を追加します。
class MyServiceNewFunc(OpenRTM_aist.RtcNewFunc):
def createRtc(self, manager):
return MyServiceProvider(manager)
class MyServiceDeleteFunc(OpenRTM_aist.RtcDeleteFunc):
def deleteRtc(self, rtcBase):
rtcBase = None
profile = OpenRTM_aist.Properties(defaults_str=myserviceprovider_spec)
manager.registerFactory(profile,
MyServiceProvider,
OpenRTM_aist.Delete) def MyServiceProviderInit(manager):
profile = OpenRTM_aist.Properties(myserviceprovider_spec)
manager.registerFactory(profile,
MyServiceNewFunc(),
MyServiceDeleteFunc())
def MyModuleInit(manager):
class MyModuleInitProc(OpenRTM_aist.ModuleInitProc):
MyModuleInitProc クラス(修正後)
class MyModuleInitProc(OpenRTM_aist.ModuleInitProc):
def myModuleInit(self, manager):
MyServiceProviderInit(manager)
# Create a component
comp = manager.createComponent("MyServiceProvider")
mgr.setModuleInitProc(MyModuleInit)
mgr.setModuleInitProc(MyModuleInitProc())
MyServiceProvider.py の修正はここまでです。
もともとの2行
import CORBA, PortableServer import SimpleService, SimpleService__POA
import time from SimpleService import MyServicePOA
class MyService_i (SimpleService__POA.MyService):
class MyService_i (MyServicePOA):
class MyService_i (MyServicePOA):
def __init__(self):
self._echoList = []
self._valueList = []
self._myValue = 0
def echo(self, msg):
self._echoList.append(msg)
print "MyService::echo() was called."
for i in range(10):
print "Message: ", msg
time.sleep(1)
print "MyService::echo() was finished."
return msg
def get_echo_history(self):
print "MyService::get_echo_history() was called."
for i in range(len(self._echoList)):
print repr(i) + ": " + self._echoList[i]
return self._echoList
def set_value(self, value):
self._valueList.append(value)
self._myValue = value
print "MyService::set_value() was called."
print "Current value: ", self._myValue
def get_value(self):
print "MyService::get_value() was called."
print "Current value: ", self._myValue
return float(self._myValue)
def get_value_history(self):
print "MyService::get_value_history() was called.",len(self._valueList)
for i in range(len(self._valueList)):
print repr(i) + ": " + repr(self._valueList[i])
return self._valueList
また、こちらにも直接コマンドとして呼ばれたときのためのメインコードがありますが、削除しても問題ありません。
Jython なのでコンパイルの必要はありません。
% jython MyServiceProvider.py
% chmod 755 MyServiceProvider.py % ./MyServiceProvider.py
MyServiceProvider 用のRTC.xmlをコピーして開き、
基本タブのモジュール名を MyServiceConsumer に、サービスポートタブの myservice0 インターフェースの方向を Required に変更します。
CorbaConsumer を使うために、import 節に一行追加します。
# Import RTM module import jp.go.aist.rtm.RTC OpenRTM_aist = jp.go.aist.rtm.RTC OpenRTM_aist.__dict__['Properties'] = jp.go.aist.rtm.RTC.util.Properties OpenRTM_aist.__dict__['CorbaPort'] = jp.go.aist.rtm.RTC.port.CorbaPort OpenRTM_aist.__dict__['CorbaConsumer'] = jp.go.aist.rtm.RTC.port.CorbaConsumer # この行を追加
# Import Service stub modules # <rtc-template block="consumer_import"> from SimpleService import MyService # この行を追加
self._myservice = OpenRTM_aist.CorbaConsumer(MyService)
onExecute の実装例です。
def onExecute(self, ec_id):
print "onExecute"
print "\n"
print "Command list: "
print " echo [msg] : echo message."
print " set_value [value]: set value."
print " get_value : get current value."
print " get_echo_history : get input messsage history."
print " get_value_history: get input value history."
print "> ",
args = str(sys.stdin.readline())
argv = string.split(args)
argv[-1] = argv[-1].rstrip("\n")
if argv[0] == "echo" and len(argv) > 1:
retmsg = self._myservice._ptr().echo(argv[1])
print "echo() return: ", retmsg
return self.super__onExecute(ec_id)
if argv[0] == "set_value" and len(argv) > 1:
val = float(argv[1])
self._myservice._ptr().set_value(val)
print "Set remote value: ", val
return self.super__onExecute(ec_id)
if argv[0] == "get_value":
retval = self._myservice._ptr().get_value()
print "Current remote value: ", retval
return self.super__onExecute(ec_id)
if argv[0] == "get_echo_history":
echo_history = self._myservice._ptr().get_echo_history()
for i in range(len(echo_history)):
print repr(i) + ": " + echo_history[i]
return self.super__onExecute(ec_id)
if argv[0] == "get_value_history":
value_history = self._myservice._ptr().get_value_history()
print value_history, len(value_history)
for i in range(len(value_history)):
print repr(i) + ": " + repr(value_history[i])
return self.super__onExecute(ec_id)
print "Invalid command or argument(s)."
return self.super__onExecute(ec_id)