Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion modules/dtls_gw/rtpp_dtls_gw.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ rtpp_dtls_gw_setup_sender(struct rtpp_module_priv *pvt,
abort();
}

if (rtpp_create_listener(pvt->cfsp, dtls_strmp->laddr, &lport, fds) == -1)
if (rtpp_create_listener(pvt->cfsp, dtls_strmp->laddr, &lport, fds,
dtls_strmp->tos) == -1)
return (-1);
CALL_SMETHOD(pvt->cfsp->sessinfo, append, spa, sidx, fds);
CALL_METHOD(pvt->cfsp->rtpp_proc_cf, nudge);
Expand Down
169 changes: 130 additions & 39 deletions python/RPTL/rptl_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,29 @@

from parsimonious.grammar import Grammar
from parsimonious.nodes import Node, NodeVisitor
from sippy.Rtp_proxy.Client.stream import Rtp_proxy_client_stream
from sippy.Rtp_proxy.Client.Worker.internal import RTPPLWorker_internal

# Define the grammar for the DSL syntax
grammar = Grammar(
r"""
program = (socket ws)+ (command ws?)+
socket = "socket" ws name ":" ws address (ws arr ws logfile)?
program = ws? stmt (ws stmt)* ws?
stmt = socket / command
socket = "socket" ws name ":" ws address (ws1 socket_params)? (ws1 arr ws logfile)?
command = name ":" ws action (ws output_spec)?
name = ~r"[.]?\w+"
output_spec = arr ws (trans_spec / var_name)
var_name = ~r"\w+"
trans_spec = func_name obra func_arg (comm ws func_arg)* cbra
func_arg = ~"[^,)]+"
func_name = ~"\w+"
socket_params = param (ws1 param)*
param = ~r"(?!->)\S+"
address = ~r"\S+"
logfile = ~r"\S+"
action = ~r"(.(?!->))*"
ws = ~r"\s*"
ws1 = ~r"[ \t]+"
arr = "->"
comm = ","
obra = "("
Expand All @@ -31,11 +37,17 @@ class CommandRunner():
rc = None
spath: str
outfile: str
proc = None

def __init__(self, socket_name, outfile):
def __init__(self, socket_name, outfile, params, variables):
from sippy.Rtp_proxy.client import Rtp_proxy_client
self.rc = Rtp_proxy_client({'_sip_address': '127.0.0.1'}, spath = socket_name,
nworkers = 4, no_version_check = True)
if socket_name == 'stdio:':
rtpproxy_bin = variables.get('RTPPROXY_BIN', 'rtpproxy')
self.rc = StdioRtpProxyClient(params, rtpproxy_bin = rtpproxy_bin)
self.proc = self.rc.proc
else:
self.rc = Rtp_proxy_client({'_sip_address': '127.0.0.1'}, spath = socket_name,
nworkers = 4, no_version_check = True)
self.spath = socket_name
self.outfile = outfile
if outfile:
Expand All @@ -47,6 +59,44 @@ def log(self, response):
self._outfd.write(response + '\n')
self._outfd.flush()

def shutdown(self):
if self.rc is None:
return
if hasattr(self.rc, 'shutdown'):
self.rc.shutdown()
self.rc = None

class StdioRtpProxyClient(Rtp_proxy_client_stream):
def __init__(self, extra_args, nworkers = 1, rtpproxy_bin = None):
import socket
import subprocess
if rtpproxy_bin is None:
rtpproxy_bin = 'rtpproxy'
self.worker_class = RTPPLWorker_internal
cmd = [rtpproxy_bin, '-f', '-s', 'stdio:'] + list(extra_args)
parent_sock, child_sock = socket.socketpair()
self._stdio_sock = parent_sock
self.proc = subprocess.Popen(cmd, stdin = child_sock, stdout = child_sock,
stderr = None, close_fds = True)
child_sock.close()
super().__init__({'_sip_address': '127.0.0.1'}, address = parent_sock,
bind_address = None, nworkers = nworkers, family = socket.AF_UNIX)

def shutdown(self):
super().shutdown()
if self._stdio_sock is not None:
self._stdio_sock.close()
self._stdio_sock = None
if self.proc is None:
return
if self.proc.poll() is None:
self.proc.terminate()
try:
self.proc.wait(timeout = 2.0)
except Exception:
self.proc.kill()
self.proc = None

class TransFunction():
name: str
args: list
Expand Down Expand Up @@ -90,8 +140,8 @@ class ScriptRunner():
internal_ops: dict
ex_info: ExceptionInfo = None

def __init__(self, sockets, commands):
self.sockets = sockets
def __init__(self, commands):
self.sockets = {}
self.commands = commands
self.variables = {}
self.internal_ops = {
Expand All @@ -102,12 +152,18 @@ def __init__(self, sockets, commands):
self.issue_next_cmd()

def issue_next_cmd(self):
if self.i_command == len(self.commands):
from sippy.Core.EventDispatcher import ED2
ED2.breakLoop()
return
cmd = self.commands[self.i_command]
self.i_command += 1
from sippy.Core.EventDispatcher import ED2
while True:
if self.i_command == len(self.commands):
ED2.breakLoop()
return
cmd = self.commands[self.i_command]
self.i_command += 1
if isinstance(cmd, SocketSpec):
self.sockets[cmd.name] = CommandRunner(cmd.address, cmd.outfile,
cmd.params, self.variables)
continue
break
command = self.expand_vars(cmd.action)
if cmd.socket_name.startswith('.'):
self.internal_ops[cmd.socket_name](cmd, command)
Expand Down Expand Up @@ -185,31 +241,55 @@ class Command():
action: str
output_var: str = None

class SocketSpec():
name: str
address: str
params: list
outfile: str

def __init__(self, name, address, params, outfile):
self.name = name
self.address = address
self.params = params
self.outfile = outfile

# Define a class to visit the nodes in the parse tree
class DSLVisitor(NodeVisitor):
def __init__(self):
self.sockets = {}
self.rules = []

def visit_socket(self, node, children):
_, _, name, _, _, address, rest = children
_, _, name, _, _, address, params_group, log_group = children
params = []
if not isinstance(params_group, Node) and len(params_group) > 0:
params = self._find_params(params_group) or []
outfile = None
if not isinstance(rest, Node) and len(rest) > 0:
assert(len(rest) == 1)
_, _, _, _outfile = rest[0]
outfile = _outfile.text
self.sockets[name.text] = CommandRunner(address.text, outfile)

def visit_command_old(self, node, children):
cmd = Command()
socket_name, _, _, action, rest = children
cmd.socket_name = socket_name.text
cmd.action = action.text
if not isinstance(rest, Node) and len(rest) > 0:
assert(len(rest) == 1)
_, _, _, var_name = rest[0]
cmd.output_var = var_name.text
self.rules.append(cmd)
if not isinstance(log_group, Node) and len(log_group) > 0:
outfile = self._find_logfile(log_group)
self.rules.append(SocketSpec(name.text, address.text, params, outfile))

def visit_socket_params(self, node, children):
return node.text.split()

def _find_params(self, obj):
if isinstance(obj, list):
if obj and all(isinstance(x, str) for x in obj):
return obj
for item in obj:
found = self._find_params(item)
if found is not None:
return found
return None

def _find_logfile(self, obj):
if isinstance(obj, Node) and obj.expr_name == 'logfile':
return obj.text
if isinstance(obj, list):
for item in obj:
found = self._find_logfile(item)
if found is not None:
return found
return None

def visit_command(self, node, children):
cmd = Command()
Expand Down Expand Up @@ -252,13 +332,13 @@ def parse_dsl(dsl_code):
def execute_dsl(parse_tree):
visitor = DSLVisitor()
visitor.visit(parse_tree)
return visitor.sockets, visitor.rules
return visitor.rules

test_dsl = """socket ALICE: /tmp/abc.sock
socket BOB: udp:192.168.221.1:38282
socket CHARLIE: tcp:127.0.0.1:32323 -> CHARLIE.rout
.set: --CALLID-- -> CALLID
.set: 0.01 -> ICD
.eval: --CALLID-- -> CALLID
.eval: 0.01 -> ICD
ALICE: U %%CALLID%% 127.0.0.1 12345 from_tag_1 -> PORT
ALICE: U %%CALLID%% 127.0.0.1 12345 from_tag_1 -> str_split(aa, PORT, FOO)
.sleep: %%ICD%%
Expand All @@ -274,11 +354,16 @@ def execute_dsl(parse_tree):

#ALICE: U %%CALLID%% 127.0.0.1 12345 from_tag_1 -| str_split(aa, PORT, FOO)

def usage(rc = 0):
sys.stderr.write('usage: rptl_run.py [-s script.rptl] [-S sippy_path]\n')
sys.stderr.flush()
sys.exit(rc)

if __name__ == '__main__':
try:
opts, args = getopt.getopt(sys.argv[1:], 's:S:')
opts, args = getopt.getopt(sys.argv[1:], 's:S:h')
except getopt.GetoptError:
usage()
usage(2)

dsl_text = test_dsl
sippy_path = None
Expand All @@ -289,20 +374,26 @@ def execute_dsl(parse_tree):
if o == '-S':
sippy_path = a.strip()
continue
if o == '-h':
usage(0)

if sippy_path != None:
sys.path.insert(0, sippy_path)

tree = parse_dsl(dsl_text)
#print(tree)
ss, rs = execute_dsl(tree)
rs = execute_dsl(tree)
#for r in rs:
# print(F'socket = "socket = {ss[r.socket_name]}", action = "{r.action}", output_var = "{r.output_var}"')

from sippy.Core.EventDispatcher import ED2

srun = ScriptRunner(ss, rs)
ED2.loop()
srun = ScriptRunner(rs)
try:
ED2.loop()
finally:
for socket in srun.sockets.values():
socket.shutdown()
if srun.ex_info:
m = F'Failed action: "{srun.ex_info.cmd.action}"\n\texpanded: "{srun.ex_info.command}"\n'
sys.stderr.write(m)
Expand Down
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ BASE_SOURCES=main.c rtp.h rtpp_server.c \
$(CMDSRCDIR)/rpcpv1_norecord.c $(CMDSRCDIR)/rpcpv1_norecord.h \
$(CMDSRCDIR)/rpcpv1_ul_subc.c $(CMDSRCDIR)/rpcpv1_ul_subc.h \
$(RTPP_AUTOSRC_SOURCES) rtpp_epoll.c rtpp_str.c rtpp_str.h \
rtpp_sbuf.c rtpp_sbuf.h rtpp_refproxy.c rtpp_command_reply.c
rtpp_sbuf.c rtpp_sbuf.h rtpp_refproxy.c rtpp_command_reply.c \
$(CMDSRCDIR)/rpcpv1_ul_subc_set.c $(CMDSRCDIR)/rpcpv1_ul_subc_set.h
BASE_SOURCES+=$(ADV_DIR)/packet_observer.h $(ADV_DIR)/pproc_manager.c \
$(ADV_DIR)/pproc_manager.h
BASE_SOURCES+=rtpp_modman.c rtpp_module_if_static.c rtpp_module_if_static.h
Expand Down
Loading
Loading