2、mod_ccpaas.c

#include <switch.h>  
#include <switch_stun.h>  

#define zstr(x) _zstr(x)

SWITCH_MODULE_LOAD_FUNCTION(mod_ccpaas_load);
SWITCH_MODULE_RUNTIME_FUNCTION(mod_ccpaas_runtime);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_ccpaas_shutdown);
SWITCH_MODULE_DEFINITION(mod_ccpaas, mod_ccpaas_load, mod_ccpaas_shutdown, mod_ccpaas_runtime);

switch_api_interface_t *api_interface;

static struct {
    switch_hash_t *caller_orig_hash;
    switch_hash_t *consumer_orig_hash;
    switch_hash_t *bridge_hash;
    switch_hash_t *use_hash;
    switch_mutex_t *use_mutex;
    switch_mutex_t *caller_orig_mutex;
    switch_mutex_t *consumer_orig_mutex;
    switch_mutex_t *bridge_mutex;
    switch_hash_t *fifo_hash;
    switch_mutex_t *mutex;
    switch_mutex_t *sql_mutex;
    switch_memory_pool_t *pool;
    int running;
    switch_event_node_t *node;
    char hostname[256];
    char *dbname;
    char odbc_dsn[1024];
    int node_thread_running;
    switch_odbc_handle_t *master_odbc;
    int threads;
    switch_thread_t *node_thread;
    int debug;
    struct fifo_node *nodes;
    char *pre_trans_execute;
    char *post_trans_execute;
    char *inner_pre_trans_execute;
    char *inner_post_trans_execute;
    switch_sql_queue_manager_t *qm;
    int allow_transcoding;
    switch_bool_t delete_all_members_on_startup;
} globals;

struct callback {
    char *buf;
    size_t len;
    int matches;
};
typedef struct callback callback_t;
struct cc_paas_call_obj {
    char aleg[64];
char gateway[32];
    char exten[64];
char dp[32];
char context[32];
char cid_name[32];
char cid_num[32];
char number[15];
char uid[32];
char task_table[32];
char task_tabke_id[32];
switch_memory_pool_t *pool;
};

switch_cache_db_handle_t *ccpaas_get_db_handle(void)
{
    switch_cache_db_handle_t *dbh = NULL;
    char *dsn;

    if (!zstr(globals.odbc_dsn)) {
        dsn = globals.odbc_dsn;
    } else {
        dsn = globals.dbname;
    }

    if (switch_cache_db_get_db_handle_dsn(&dbh, dsn) != SWITCH_STATUS_SUCCESS) {
        dbh = NULL;
    }

    return dbh;
}

static switch_bool_t ccpaas_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata)
{
    switch_bool_t ret = SWITCH_FALSE;
    char *errmsg = NULL;
    switch_cache_db_handle_t *dbh = NULL;

    if (mutex) {
        switch_mutex_lock(mutex);
    }

    if (!(dbh = ccpaas_get_db_handle())) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
        goto end;
    }

    if (globals.debug > 1) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "sql: %s\n", sql);

    switch_cache_db_execute_sql_callback(dbh, sql, callback, pdata, &errmsg);

    if (errmsg) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
        free(errmsg);
    }

end:

    switch_cache_db_release_db_handle(&dbh);

    if (mutex) {
        switch_mutex_unlock(mutex);
    }

    return ret;
}

static switch_status_t cc_execute_sql(char *sql, switch_mutex_t *mutex)
{
    switch_cache_db_handle_t *dbh = NULL;
    switch_status_t status = SWITCH_STATUS_FALSE;

    if (mutex) {
        switch_mutex_lock(mutex);
    }
    if (!(dbh = ccpaas_get_db_handle())) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
        goto end;
    }

    status = switch_cache_db_execute_sql(dbh, sql, NULL);

end:

    switch_cache_db_release_db_handle(&dbh);

    if (mutex) {
        switch_mutex_unlock(mutex);
    }

    return status;
}

//更新失败的呼叫
static int cc_task_update_fail(char *task_id, char *run_id){
char *sql;
char *sql_del;
sql = switch_mprintf("update uncall_task.ccpaas_run_%s set call_status = '3' where id = '%s'",task_id,run_id);
sql_del = switch_mprintf("delete from `uncall_task`.`phone_memory` where task_table='%s' and task_tabke_id='%s' ;",task_id,run_id);
cc_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
cc_execute_sql(sql_del, globals.sql_mutex);
switch_safe_free(sql_del);
return 1;
}

#define ORIGINATE_SYNTAX "<call url> <exten>|&<application_name>(<app_args>) [<dialplan>] [<context>] [<cid_name>] [<cid_num>] [<timeout_sec>]"
static int cc_outbound_task(char *aleg, char *exten , char *dp, char *context, char *cid_name , char *cid_num , char *number , char *task_id, char *run_id){
    
switch_channel_t *caller_channel;
    switch_core_session_t *caller_session = NULL;
    
    uint32_t timeout = 30;
    switch_call_cause_t cause = SWITCH_CAUSE_NORMAL_CLEARING;
    switch_status_t status = SWITCH_STATUS_SUCCESS;
    if (switch_ivr_originate_ccpaas(NULL, &caller_session, &cause, aleg, timeout, NULL, cid_name, cid_num, NULL, NULL, SOF_NONE, NULL, task_id , run_id) != SWITCH_STATUS_SUCCESS
        || !caller_session) {
//-ERR call fail
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, " --> ccpaas ---------->+Fail %s_ %s \n",task_id,run_id);
        cc_task_update_fail(task_id,run_id);
goto done;
    }
    caller_channel = switch_core_session_get_channel(caller_session);
    switch_ivr_session_transfer(caller_session, exten, dp, context);
    switch_core_session_rwunlock(caller_session);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"ccpaas---------->+OK %s\n", switch_core_session_get_uuid(caller_session));

done:
    return status;
}

static void *SWITCH_THREAD_FUNC ccpass_thread(switch_thread_t *thread, void *obj)
{
    struct cc_paas_call_obj *call_obj = (struct cc_paas_call_obj *) obj;
    switch_memory_pool_t *pool = call_obj->pool;
char aleg[128]={'\0'};
char uid[32]={'\0'};
char *exten=call_obj->exten;
char *dp="XML";
char *context = call_obj->context;
char *cid_name=call_obj->cid_name;
char *cid_num= call_obj->cid_num;
char *number = call_obj->number;
char *gateway = call_obj->gateway;
char *task_id = call_obj->task_table;
char *run_id = call_obj->task_tabke_id;
snprintf(aleg,sizeof(aleg),"{ignore_early_media=true,ccpaas_id=%s_%s}sofia/gateway/%s/%s",task_id,run_id,gateway,call_obj->number);
snprintf(uid,sizeof(uid),"%s",call_obj->uid);
cc_outbound_task(aleg,exten,dp,context,cid_name,cid_num,number,task_id,run_id);
call_obj = NULL;
    switch_core_destroy_memory_pool(&pool);
pool = NULL;
    return NULL;
}

static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames)
{
    //callback_t *cbt = (callback_t *) pArg; 
    //switch_copy_string(cbt->buf, argv[0], cbt->len);  
    switch_thread_t *thread;
    switch_threadattr_t *thd_attr;
    switch_memory_pool_t *pool = NULL;
    struct cc_paas_call_obj *call_obj;
char *sql_mark;
char number[32]={'\0'};
char uid[32]={'\0'};
char gateway[32]={'\0'};
char exten[32]={'\0'};
char context[32]={'\0'};
char cid_name[32]={'\0'};
char cid_num[32]={'\0'};
char task_table[32]={'\0'};
char task_tabke_id[32]={'\0'};
snprintf(number,sizeof(number),"%s",argv[0]);
snprintf(gateway,sizeof(gateway),"%s",argv[1]);
snprintf(exten,sizeof(exten),"%s",argv[2]);
snprintf(context,sizeof(context),"%s",argv[3]);
snprintf(cid_name,sizeof(cid_name),"%s",argv[4]);
snprintf(cid_num,sizeof(cid_num),"%s",argv[5]);
snprintf(task_table,sizeof(task_table),"%s",argv[6]);
snprintf(task_tabke_id,sizeof(task_tabke_id),"%s",argv[7]);

    switch_core_new_memory_pool(&pool);
    switch_threadattr_create(&thd_attr, pool);
    switch_threadattr_detach_set(thd_attr, 1);
    /* TBD figure out how much space we need by looking at the xml_t when stacksize == 0 */
    switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
    call_obj = switch_core_alloc(pool, sizeof(*call_obj));
switch_copy_string(call_obj->number, number, sizeof(call_obj->number));
switch_copy_string(call_obj->uid, uid, sizeof(call_obj->uid));
switch_copy_string(call_obj->gateway,gateway,sizeof(gateway));
switch_copy_string(call_obj->exten,exten,sizeof(exten));
switch_copy_string(call_obj->context,context,sizeof(context));
switch_copy_string(call_obj->cid_name,gateway,sizeof(cid_name));
switch_copy_string(call_obj->cid_num,cid_num,sizeof(cid_num));
switch_copy_string(call_obj->task_table,task_table,sizeof(task_table));
switch_copy_string(call_obj->task_tabke_id,task_tabke_id,sizeof(task_tabke_id));
call_obj->pool = pool;
sql_mark = switch_mprintf("update `uncall_task`.`phone_memory` set send ='1' where task_table = '%s' and task_tabke_id = '%s';",task_table,task_tabke_id);
    
cc_execute_sql(sql_mark, globals.sql_mutex);
    switch_safe_free(sql_mark);
    switch_thread_create(&thread, thd_attr, ccpass_thread, call_obj, pool);
switch_yield(1000);
    return 0;
}

static switch_status_t read_config_file(switch_xml_t *xml, switch_xml_t *cfg) {
    
const char *cf = "fifo.conf";
    switch_xml_t settings;

    if (!(*xml = switch_xml_open_cfg(cf, cfg, NULL))) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
        return SWITCH_STATUS_TERM;
    }
    if ((settings = switch_xml_child(*cfg, "settings"))) {
        switch_xml_t param;
        for (param = switch_xml_child(settings, "param"); param; param = param->next) {
            char *var = (char*)switch_xml_attr_soft(param, "name");
            char *val = (char*)switch_xml_attr_soft(param, "value");
            if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
                if (switch_odbc_available() || switch_pgsql_available()) {
                    switch_set_string(globals.odbc_dsn, val);
                } else {
                    switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
                }
}
        }
    }
    return SWITCH_STATUS_SUCCESS;
}

/*
 * 获取外呼任务数
 */
static void ccpaas_outbound_sql()
{
char outbound_count[80] = "";
char *sql;
callback_t cbt = { 0 };
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
sql = switch_mprintf("select phone, gateway, exten, context, cid_name, cid_num, task_table, task_tabke_id, company_code from `uncall_task`.`phone_memory` where send='0'");
ccpaas_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
switch_safe_free(sql);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"ccpass---------->+OK \n");
}

SWITCH_STANDARD_API(ccpass_api_function){
stream->write_function(stream , "CCPAAS , V 1.0 \n");
return SWITCH_STATUS_SUCCESS;
}


SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_ccpaas_shutdown){
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_LOAD_FUNCTION(mod_ccpaas_load){
switch_xml_t xml, cfg;
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_cache_db_handle_t *dbh = NULL;
switch_api_interface_t *api_interface;
strncpy(globals.hostname, switch_core_get_switchname(), sizeof(globals.hostname) - 1);

if ((status = read_config_file(&xml, &cfg)) != SWITCH_STATUS_SUCCESS) return status;

if (!(dbh = ccpaas_get_db_handle())) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot open DB!\n");
}
/* connect my internal structure to the blank pointer passed to me */  
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "CC-PAAS \n");
//parse_config(SWITCH_FALSE);  
SWITCH_ADD_API(api_interface , "ccpass" , "ccpass of status", ccpass_api_function, "[name]");
/* indicate that the module should continue to be loaded */  
return SWITCH_STATUS_SUCCESS;
}


SWITCH_MODULE_RUNTIME_FUNCTION(mod_ccpaas_runtime){
while(1) {
ccpaas_outbound_sql();
switch_yield(100000);
}
}
#define switch_set_string(_dst, _src) switch_copy_string(_dst, _src, sizeof(_dst))
#define switch_cache_db_get_db_handle_dsn(_a, _b) _switch_cache_db_get_db_handle_dsn(_a, _b, __FILE__, __SWITCH_FUNC__, __LINE__)