quinta-feira, 8 de outubro de 2015

Asterisk AMI


O Asterisk é um sistema de PABX Voip OpenSource. Ele possui uma interface chamada AMI (Asterisk Manager Interface). Basicamente no protocolo AMI existem três tipos de pacotes definidos (Action,Event,Respose).
O pacote Action serve para enviar um comando do cliente AMI  para o servidor AMI, cada comando pode ter parâmetros diferentes especificados pelo protocolo.
O pacote Response é enviado do servidor para o cliente como uma responta para uma Action, podendo ser uma resposta indicando uma falha ou um sucesso. Possui dois campos principais, o estado da resposta (Sucess ou Error) e uma mensagem, no caso de erros a mensagem normalmente expõe o motivo do erro.
O pacote Event é enviado do servidor para o cliente, para notificar eventos ocorridos no servidor, como por exemplo o inicio ou o fim de uma ligação. É possível enviar um pacote Action para o servidor indicando os tipos de eventos que o cliente gostaria de ser notificado.
Abaixo é demonstrado um exemplo de conexão com o AMI, o que está em verde são mensagens do cliente para o servidor e o que está em vermelho do servidor para o cliente.


Action: Login
Username: admin
Secret: senha123

Response: Success
Message: Authentication accepted

Action: Events
EventMask: call

Event: FullyBooted
Privilege: system,all
Status: Fully Booted

Response: Success
Events: On

Action: Logoff

Response: Goodbye
Message: Thanks for all the fish.

Em alguns trabalhos que realizei precisei fazer uma conexão com o asterisk utilizando esta interface. Encontrei poucas bibliotecas para auxiliar neste trabalho e não me agradaram muito. Então resolvi criar minha própria biblioteca, ela ficou bem simples e um pouco diferente das outras bibliotecas que encontrei por ai. Talvez em um futuro não muito distante eu melhore essa biblioteca para tentar publica-lá em um repositório, mas por enquanto vou disponibilizá-la por aqui mesmo. 

import re
import socket
import threading
from functools import partial
import time


class Action(object):
    def __init__(self, name, keys={}, variables={}):
        self.name = name
        self.keys = keys
        self.variables = variables

    def __str__(self):
        package = "Action: %s\r\n" % self.name
        for key in self.keys:
            package += "%s: %s\r\n" % (key, self.keys[key])
        for var in self.variables:
            package += "Variable: %s=%s\r\n" % (var, self.variables[var])
        return package

    def __getattr__(self, item):
        if item in ('name', 'keys', 'variables'):
            return object.__getattr__(self, item)
        return self.keys[item]

    def __setattr__(self, key, value):
        if key in ('name', 'keys', 'variables'):
            return object.__setattr__(self, key, value)
        self.keys[key] = value

    def __setitem__(self, key, value):
        self.variables[key] = value

    def __getitem__(self, item):
        return self.variables[item]


class Response(object):
    match_regex = re.compile('^Response: .*', re.IGNORECASE)

    @staticmethod
    def read(response):
        lines = str(response).splitlines()
        (key, value) = lines[0].split(": ", 1)
        if not key.lower() == 'response':
            raise Exception()
        status = value
        keys = {}
        for i in range(1, len(lines)):
            (key, value) = lines[i].split(": ", 1)
            keys[key] = value
        return Response(status, keys)

    @staticmethod
    def match(response):
        return bool(Response.match_regex.match(str(response)))

    def __init__(self, status, keys):
        self.status = status
        self.keys = keys

    def __str__(self):
        package = "Response: %s\r\n" % self.status
        for key in self.keys:
            package += "%s: %s\r\n" % (key, self.keys[key])
        return package


class FutureResponse(object):
    def __init__(self, timeout=None):
        self.timeout = timeout
        self._response = None
        self._lock = threading.Condition()

    def set_response(self, response):
        self._lock.acquire()
        self._response = response
        self._lock.notifyAll()
        self._lock.release()

    def get_response(self):
        if self._response is not None:
            return self._response
        self._lock.acquire()
        self._lock.wait(self.timeout)
        self._lock.release()
        return self._response

    response = property(get_response, set_response)


class Event(object):
    match_regex = re.compile('^Event: .*', re.IGNORECASE)

    @staticmethod
    def read(event):
        lines = str(event).splitlines()
        (key, value) = lines[0].split(': ', 1)
        if not key.lower() == 'event':
            raise Exception()
        name = value
        keys = {}
        for i in range(1, len(lines)):
            (key, value) = lines[i].split(': ', 1)
            keys[key] = value
        return Event(name, keys)

    @staticmethod
    def match(event):
        return bool(Event.match_regex.match(str(event)))

    def __init__(self, name, keys):
        self.name = name
        self.keys = keys

    def __str__(self):
        return 'Event : %s -> %s' % (self.name,self.keys)


class SimpleAction(Action):
    def __init__(self, name, **kwargs):
        Action.__init__(self, name=name, keys=kwargs)


class LoginAction(Action):
    def __init__(self, username, secret):
        Action.__init__(self, name='Login', keys={'Username': username, 'Secret': secret})


class LogoffAction(Action):
    def __init__(self):
        Action.__init__(self, name='Logoff', keys={})


class AMIClient(object):
    action_counter = 0
    asterisk_start_regex = re.compile('^Asterisk *Call *Manager/(?P<version>([0-9]+\.)*[0-9]+)', re.IGNORECASE)

    _futures = {}
    _event_listeners = []

    def __init__(self, address, port, buffer_size=1025):
        self.listeners = []
        self.address = address
        self.buffer_size = buffer_size
        self.port = port
        self.socket = None
        self._thread = None
        self._on = False
        self.ami_version = None

    def next_action_id(self):
        id = self.action_counter
        self.action_counter += 1
        return str(id)

    def connect(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((self.address, self.port))
        self._on = True
        self._thread = threading.Thread(target=self.listen)
        self._thread.start()

    def login(self, username, secret):
        if not self._on:
            self.connect()
        return self.send_action(LoginAction(username, secret))

    def logoff(self):
        if not self._on:
            return
        return self.send_action(LogoffAction())

    def send_action(self, action):
        if 'ActionID' not in action.keys:
            action_id = self.next_action_id()
            action.keys['ActionID'] = action_id
        else:
            action_id = action.keys['ActionID']
        future = FutureResponse()
        self._futures[action_id] = future
        self.send(action)
        return future

    def send(self, pack):
        self.socket.send(str(pack) + "\r\n")

    def listen(self):
        asterisk_start = self.socket.recv(self.buffer_size)
        match = AMIClient.asterisk_start_regex.match(asterisk_start)
        if not match:
            raise Exception()
        self.ami_version = match.group('version')
        pack = ""
        while self._on:
            data = self.socket.recv(self.buffer_size)
            if not data:
                continue
            pack += data
            if not (pack.endswith("\r\n\r\n") or pack.endswith("\n\n")):
                continue
            self.fire_recv_pack(pack)
            pack = ""
        self.socket.close()

    def fire_recv_reponse(self, response):
        if response.status.lower() == 'goodbye':
            self._on = False
        if 'ActionID' not in response.keys:
            return
        action_id = response.keys['ActionID']
        if action_id not in self._futures:
            return
        future = self._futures.pop(action_id)
        future.response = response

    def fire_recv_event(self, event):
        for listener in self._event_listeners:
            listener(event=event, source=self)

    def fire_recv_pack(self, pack):
        if pack.endswith('\r\n\r\n'):
            pack = pack[0: - 4]
        if pack.endswith('\n\n'):
            pack = pack[0:- 2]
        if Response.match(pack):
            response = Response.read(pack)
            self.fire_recv_reponse(response)
        if Event.match(pack):
            event = Event.read(pack)
            self.fire_recv_event(event)

    def add_event_listener(self, event_listener):
        self._event_listeners.append(event_listener)

    def remove_event_listener(self, event_listener):
        self._event_listeners.remove(event_listener)


class AMIClientAdapter(object):
    def __init__(self, ami_client):
        self._ami_client = ami_client

    def _action(self, name, variables={}, **kwargs):
        action = Action(name, kwargs)
        action.variables = variables
        return self._ami_client.send_action(action)

    def __getattr__(self, item):
        return partial(self._action, item)

Vou tentar resumir a utilização desta biblioteca.

Ela possui classes responsáveis em representar os três tipos de pacotes (Action,Response, Event), uma classe para fazer a conexão (AMIClient) , uma classe para facilitar o uso da conexão (AMIClienteAdapter) e uma classe encapsular as respostas (FutureResponse).
A AMIClient quando envia um pacote de ação para o servidor, enumera esta ação utilizando a chave ActionID, por padrão o servidor envia a resposta com a mesma enumeração, assim é possível identificar o par de ação e resposta.


Ao se enviar uma ação com a AMIClient é retornado uma future uma promessa de resposta (FutureResponse), desta forma é possível enviar ações de forma assíncrona ou aguardar a resposta da ação ao acessar o atributo response da future.


client = AMIClient('192.168.0.124', 5038)
future = client.login(username='admin', secret='senha123')
#aguarda a resposta do login
reponse = future.response
future = client.logoff()
#fim eh impresso mesmo sem a resposta de logoff
print "fim"

A classe AMIClientAdapter ajuda a reduzir a quantidade de código, veja a seguir um exemplo de código que inicia uma ligação sem usar o adapter e depois um exemplo utilizando o adapter.

client = AMIClient('192.168.0.124', 5038)
future = client.login(username='admin', secret='senha123')
print future.response
action = SimpleAction(
    'Originate',
    Channel='SIP/101',
    Context='default',
    Exten='101',
    CallerID='Teste123',
    Timeout='5000',
    Priority='1'
)
future = client.send_action(action)
print future.response
client.logoff()
client = AMIClient('192.168.0.124', 5038)
future = client.login(username='admin', secret='senha123')
print future.response
adapter = AMIClientAdapter(client)
future = adapter.Originate(
    Channel='SIP/101',
    Context='default',
    Exten='101',
    CallerID='Teste123',
    Timeout='5000',
    Priority='1'
)
print future.response
client.logoff()

Espero que esta biblioteca possa ser útil para alguém!
Obrigado!

15 comentários :

  1. Prezado amigo, vi este post que procuro a dias alguma coisa. Seria possível vc me dar umas dicas ? Essa conexão com asterisk via AMI estou tendo muita dor de cabeça.
    Obrigado.

    ResponderExcluir
  2. Você já testou AMI em chan_dongle para obter a reposta se ele foi enviado ou não ? estou sofrendo nessa captura de resposta! kkk Obrigado bom tutorial gostei! É difícil achar algo BR

    ResponderExcluir
    Respostas
    1. Nunca testei =/
      Mas vou dar uma pesquisada

      Excluir
    2. consegui resolver o problema do chan_dongle de outra maneira, porém cai novamente em seu post, como eu faço para monitorar os eventos ? não entendi muito bem!

      Excluir
    3. Se você estiver usando python eu coloquei esse código no github, também da para instalar com o pip
      https://github.com/ettoreleandrotognoli/python-ami
      Basicamente você vai criar uma conexão cliente e registrar os eventos que você quer monitorar e o que fazer com eles
      Aqui tem um exemplo de uma “bina”:
      https://github.com/ettoreleandrotognoli/python-ami/blob/master/examples/simple_ringing_notification.py
      Ai ele ouve o evento Newstate quando o ChannelStateDesc é Ringing

      Excluir
    4. Primeiramente muito obrigado pela resposta! Estou um pouco apanhando para essas action e events.
      vamos lá deixa eu te explicar o que eu preciso para ver se você pode me dar um norte.
      1. Estou utilizando o framework do python web2py.
      2. Estou trabalhando com agentes na fila(outro problema que tenho que solucionar pois só conseguir fazer ele logar por dialplan queria fazer ele logar por AMI sem precisar chamar um dialplan).
      3. Estou criando uma action "SIPpeer" para pegar o ramal do ip do agente para depois adiciona-lo na fila.
      3.1 Então eu crio uma action "SIPpeer"
      3.2 Dou o comando "client.add_event_listener(event_listenerr)" porém eu não consigo obter o resultado, como eu poderia fazer isso ? ja ativar a action e pegar o resultado do "SIP SHOW PEERS" ? acho que é essa minha maior dificuldade pegar o resultado da action assim que eu disparei ela.

      Tenho pouco tempo de experiência com AMI principalmente em python.
      Peço desculpas pelo incomodo.

      Excluir
    5. No caso do Sippeers a reposta da action não tem informação alguma mas dispara uma sequencia de eventos com as informações que você precisa.
      Você vai ter que adicionar um listener antes de mandar a action, esse listener vai ter que tratar os eventos e montar uma lista.
      Segue exemplo: https://pastebin.com/v20cQBji

      Excluir
    6. Este comentário foi removido pelo autor.

      Excluir
    7. Obrigado pela resposta, embora seu link não abriu, com base em sua explicação eu criei uma lista=[] e quando eu chamo a função def event_listener(event,**kwargs): armazeno o numero do ramal na lista.

      com isso consigo mostrar na view o ramal do usuário porém ele vem com [u'1012'].

      Porém eu não entendi porque usar a lista ? ela funciona como uma variavel global na controller ?

      Excluir
    8. Essa parte de controller imagino que seja do Web2Py que eu não conheço. A lista eu sugeri pois vão ser gerados vários eventos, um para cada peer.
      Essa formatação "[u'1012']" é como o python apresenta as listas, tente pegar só o primeiro elemento ( lista[0] ) ou fazer um join (",".join(lista))
      O link aqui está abrindo normal =/
      Tente este:
      https://gist.github.com/ettoreleandrotognoli/b9869a2bb8dd61e3e42b614710d95fba

      Excluir
    9. Este comentário foi removido pelo autor.

      Excluir
  3. Obrigado pela resposta, já estou estudando seu código e adequando o meu sistema.
    Terei muitos desafios, principalmente para controle desses eventos.
    Tenho que pegar tudo em tempo real, você ja deve ter feito algo do genero com asterisk ne ? tenho que pegar o agente na fila e apresentar para ele em tempo real, quanto tempo ele está logado em qual fila ele está, enviar notificação via desktop quando houver uma chamada entrante.

    Todos os eventos no asterisk tem um endpoint como no sippeers que tem o complete, porque pelo o que vi, você fez o controle do final da thread por ele ne, e se no caso for um event de Agent ?

    Mais uma vez obrigado pela força, desculpa qualquer coisa é porque estou bem cru mesmo.

    ResponderExcluir
  4. Boa tarde, estou utilizando sua biblioteca em meus projetos asterisk está sendo bem construtiva, porém estou com problemas quando envio muitas requisições, cerca de um tempo acaba aparecendo um valor "NONE" e meu script não roda mais. tenho um script que verifica ligações de 5 em 5 segundos, ele abre a conexão com asterisk e fecha a mesma, depois que pega os dados, você já passou por algo relacionado ?

    ResponderExcluir
    Respostas
    1. Assim de cabeça não me lembro de ter passado por isso.
      Qual versão da lib você está usando? Se você quiser cria uma issue no github:
      https://github.com/ettoreleandrotognoli/python-ami
      Coloca o máximo de informações que você puder ai eu vou tentar descobrir o problema.
      Obrigado!

      Excluir