15年基于freewitch做的自动群呼系统,主要由监听模块,任务外呼模块,及FIFO实现。



1、mod_cctask.c

#include <switch.h>  
#include <switch_stun.h>  

#define zstr(x) _zstr(x)

SWITCH_MODULE_LOAD_FUNCTION(mod_cctask_load);
SWITCH_MODULE_RUNTIME_FUNCTION(mod_cctask_runtime);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_cctask_shutdown);
SWITCH_MODULE_DEFINITION(mod_cctask, mod_cctask_load, mod_cctask_shutdown, mod_cctask_runtime);

switch_api_interface_t *api_interface;

static struct {
    switch_hash_t *caller_orig_hash;
    switch_hash_t *consumer_orig_hash;
    switch_hash_t *bridge_hash;
    switch_hash_t *use_hash;
    switch_mutex_t *use_mutex;
    switch_mutex_t *caller_orig_mutex;
    switch_mutex_t *consumer_orig_mutex;
    switch_mutex_t *bridge_mutex;
    switch_hash_t *fifo_hash;
    switch_mutex_t *mutex;
    switch_mutex_t *sql_mutex;
    switch_memory_pool_t *pool;
    int running;
    switch_event_node_t *node;
    char hostname[256];
    char *dbname;
    char odbc_dsn[1024];
    int node_thread_running;
    switch_odbc_handle_t *master_odbc;
    int threads;
    switch_thread_t *node_thread;
    int debug;
    struct fifo_node *nodes;
    char *pre_trans_execute;
    char *post_trans_execute;
    char *inner_pre_trans_execute;
    char *inner_post_trans_execute;
    switch_sql_queue_manager_t *qm;
    int allow_transcoding;
    switch_bool_t delete_all_members_on_startup;
} globals;

struct callback {
    char *buf;
    size_t len;
    int matches;
};
typedef struct callback callback_t;


struct cc_cctask_call_obj {
char task_id[32];
char gateway[64]; //网关
char exten[64];//分机
char context[64];//context区分
char cid_name[64];//主叫
char cid_num[64];//主叫号码
char fifo_name[64];
};
typedef struct cc_cctask_call_obj cctask_call_obj;


switch_cache_db_handle_t *cctask_get_db_handle(void)
{
    switch_cache_db_handle_t *dbh = NULL;
    char *dsn;

    if (!zstr(globals.odbc_dsn)) {
        dsn = globals.odbc_dsn;
    } else {
        dsn = globals.dbname;
    }

    if (switch_cache_db_get_db_handle_dsn(&dbh, dsn) != SWITCH_STATUS_SUCCESS) {
        dbh = NULL;
    }

    return dbh;
}

static switch_bool_t cctask_execute_sql_callback(switch_mutex_t *mutex, char *sql, switch_core_db_callback_func_t callback, void *pdata)
{
    switch_bool_t ret = SWITCH_FALSE;
    char *errmsg = NULL;
    switch_cache_db_handle_t *dbh = NULL;

    if (mutex) {
        switch_mutex_lock(mutex);
    }

    if (!(dbh = cctask_get_db_handle())) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
        goto end;
    }

    if (globals.debug > 1) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "sql: %s\n", sql);

    switch_cache_db_execute_sql_callback(dbh, sql, callback, pdata, &errmsg);

    if (errmsg) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
        free(errmsg);
    }

end:

    switch_cache_db_release_db_handle(&dbh);

    if (mutex) {
        switch_mutex_unlock(mutex);
    }

    return ret;
}

char *cc_execute_sql2str(switch_mutex_t *mutex, char *sql, char *resbuf, size_t len)
{
    char *ret = NULL;
    char *errmsg = NULL;

    switch_cache_db_handle_t *dbh = NULL;
    if (mutex) {
        switch_mutex_unlock(mutex);
    }

    if (!(dbh = cctask_get_db_handle())) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
        goto end;
    }

    ret = switch_cache_db_execute_sql2str(dbh, sql, resbuf, len, NULL);
    if (errmsg) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL ERR: [%s] %s\n", sql, errmsg);
        free(errmsg);
    }

end:
    switch_cache_db_release_db_handle(&dbh);

    if (mutex) {
        switch_mutex_unlock(mutex);
    }

    return ret;
}


static switch_status_t cc_execute_sql(char *sql, switch_mutex_t *mutex)
{
    switch_cache_db_handle_t *dbh = NULL;
    switch_status_t status = SWITCH_STATUS_FALSE;

    if (mutex) {
        switch_mutex_lock(mutex);
    }
    if (!(dbh = cctask_get_db_handle())) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB\n");
        goto end;
    }

    status = switch_cache_db_execute_sql(dbh, sql, NULL);

end:

    switch_cache_db_release_db_handle(&dbh);

    if (mutex) {
        switch_mutex_unlock(mutex);
    }

    return status;
}


//从子表获取外呼数据
static int sql2str_callback_run_tables(void *pArg, int argc, char **argv, char **columnNames)
{
struct cc_cctask_call_obj *in = pArg;

char *sql;
char *sql_update;

sql = switch_mprintf("insert into `uncall_task`.`phone_memory` (`cid_num`, `exten`, `context`, `task_tabke_id`, `task_table`, `gateway`, `fifo_name`, `phone`, `cid_name`) values ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s') ",
in->cid_num,
in->exten,
in->context,
argv[0],
in->task_id,
in->gateway,
in->fifo_name,
argv[1],
in->cid_name
);

switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "-----> %s \n",sql);
cc_execute_sql(sql,globals.sql_mutex);
    switch_safe_free(sql);
sql_update = switch_mprintf("update `uncall_task`.`ccpaas_run_%s` set `call_time`='%ld' where `id`='%s'  ",
in->task_id,
(long) switch_epoch_time_now(NULL),
argv[0]
);
cc_execute_sql(sql_update, globals.sql_mutex);
    switch_safe_free(sql_update);

if(in){
switch_safe_free(in);
}
return 1;
}

//写入外呼任务
static int InsterMemoryTables(char *id, char *gateway, char *exten, char *context, char *cid_name, char *cid_num, char *fifo_name, int send_limit){

char *sql;
cctask_call_obj *call_obj = (cctask_call_obj*)malloc(sizeof(cctask_call_obj));
switch_copy_string(call_obj->task_id, id, sizeof(call_obj->task_id));
switch_copy_string(call_obj->gateway, gateway, sizeof(call_obj->gateway));
switch_copy_string(call_obj->exten, exten, sizeof(call_obj->exten));
switch_copy_string(call_obj->context, context, sizeof(call_obj->context));
switch_copy_string(call_obj->cid_name, cid_name, sizeof(call_obj->cid_name));
switch_copy_string(call_obj->cid_num, cid_num, sizeof(call_obj->cid_num));
switch_copy_string(call_obj->fifo_name, fifo_name, sizeof(call_obj->fifo_name));
sql = switch_mprintf("select id, phone from uncall_task.ccpaas_run_%s where call_time  is NULL limit %d  ;",id,send_limit);
cctask_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback_run_tables, call_obj);
switch_safe_free(sql);
return 1;
}

//获取子表总数
static int GetRunTableCount(char *id){

char *sql;
int countNumber = 0;
char res[256] = "0";
sql = switch_mprintf("select count(1) from uncall_task.ccpaas_run_%s where call_time  is NULL;",id);
cc_execute_sql2str(globals.sql_mutex, sql, res, sizeof(res));
switch_safe_free(sql);
if(atoi(res)!=0){
countNumber = atoi(res);
}
return countNumber;
}

//获取FIFO成员的总数
static int GetNumberFifoCount(char *fifo_name){

char *sql;
int countNumber = 0;
char res[256] = "0";
sql = switch_mprintf("select count(*)  from  uncall_pbx.fifo_outbound where fifo_name='%s';",fifo_name);
cc_execute_sql2str(globals.sql_mutex, sql, res, sizeof(res));
switch_safe_free(sql);
if(atoi(res)!=0){
countNumber = atoi(res);
}
return countNumber;
}


//获取FIFO成员的总数
static int GetDialogsCount(char *fifo_name){

char *sql;
int countNumber = 0;
char res[256] = "0";
sql = switch_mprintf("select count(*) from uncall_pbx.sip_dialogs as ud where ud.contact_user in (select uc.extension from uncall_config.pbx_fifo_extension uc where uc.fifo_name ='%s') ;",fifo_name);
cc_execute_sql2str(globals.sql_mutex, sql, res, sizeof(res));
switch_safe_free(sql);
if(atoi(res)!=0){
countNumber = atoi(res);
}
return countNumber;
}

//获取任务内存表数据
static int GetMemoryCount(char *fifo_name){
char *sql;
int countNumber = 0;
char res[256] = "0";
sql = switch_mprintf("select count(1) from uncall_task.phone_memory where fifo_name = '%s';",fifo_name);
cc_execute_sql2str(globals.sql_mutex, sql, res, sizeof(res));
switch_safe_free(sql);
if(atoi(res)!=0){
countNumber = atoi(res);
}
return countNumber;
}


//获取子表总数
static int UpdateRun(char *id){

char *sql;
sql = switch_mprintf("update `uncall_task`.`ccpaas_task` set `task_status`='finish' where `id`='%s' ;",id);

cc_execute_sql(sql, globals.sql_mutex);
switch_safe_free(sql);
return 0;
}

//获取外呼任务数
static int sql2str_callback(void *pArg, int argc, char **argv, char **columnNames)
{
int fifo_number = 0;
int fifo_number_in_dialogs =0;
int fifo_idle = 0;
int memory = 0;
int send_limit = 0;
int run_table_count = 0;
char id[32]={'\0'};//id
char task_type[64]={'\0'}; //呼叫类型
int  call_max = 0; //最大并发数
char task_cortrol_type[32]={'\0'};//呼叫类型
float task_override = 0;//呼叫系数
char task_day_start[32]={'\0'};//开始时间
char task_day_end[32]={'\0'};//结束时间
char gateway[64]={'\0'}; //网关
char exten[64]={'\0'};//分机
char context[64]={'\0'};//context区分
char cid_name[64]={'\0'};//主叫
char cid_num[64]={'\0'};//主叫号码  
char fifo_name[64]={'\0'};//主叫号码

snprintf(id,sizeof(id),"%s",argv[0]);
snprintf(task_type,sizeof(task_type),"%s",argv[3]);
call_max = atoi(argv[6]);
snprintf(task_cortrol_type,sizeof(task_cortrol_type),"%s",argv[7]);
task_override = atof(argv[8]);
snprintf(task_day_start,sizeof(task_day_start),"%s",argv[9]);
snprintf(task_day_end,sizeof(task_day_end),"%s",argv[10]);
snprintf(gateway,sizeof(gateway),"%s",argv[11]);
snprintf(exten,sizeof(exten),"%s",argv[12]);
snprintf(context,sizeof(context),"%s",argv[13]);
snprintf(cid_name,sizeof(cid_name),"%s",argv[14]);
snprintf(cid_num,sizeof(cid_num),"%s",argv[15]);
snprintf(fifo_name,sizeof(fifo_name),"%s",argv[16]);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, " listen id ---> %s \n",id);
//判断是否在时间段之内
run_table_count = GetRunTableCount(id);
if(run_table_count<=0){ //没任务更新任务完成
UpdateRun(id);
return 0;
}

fifo_number = GetNumberFifoCount(fifo_name);
//判断FIFO中的Dialogs
fifo_number_in_dialogs = GetDialogsCount(fifo_name);
//空闲座席数
fifo_idle = fifo_number - fifo_number_in_dialogs;
if(fifo_idle <= 0){
return 0;
}
memory = GetMemoryCount(fifo_name);
if(0 == strcmp(task_cortrol_type,"i")){//
send_limit = (int)fifo_idle*task_override;
}else{
send_limit = call_max - memory ;
}
if(memory>=call_max){//超过最大并发数
send_limit = 0;
}
if(send_limit>0){//如果需要继续发号码
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, " send_limit ---> %d \n",send_limit);
InsterMemoryTables(id, gateway, exten, context, cid_name, cid_num, fifo_name,send_limit);
}
return 0;
}

static switch_status_t read_config_file(switch_xml_t *xml, switch_xml_t *cfg) {
    
const char *cf = "fifo.conf";
    switch_xml_t settings;

    if (!(*xml = switch_xml_open_cfg(cf, cfg, NULL))) {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Open of %s failed\n", cf);
        return SWITCH_STATUS_TERM;
    }
    if ((settings = switch_xml_child(*cfg, "settings"))) {
        switch_xml_t param;
        for (param = switch_xml_child(settings, "param"); param; param = param->next) {
            char *var = (char*)switch_xml_attr_soft(param, "name");
            char *val = (char*)switch_xml_attr_soft(param, "value");
            if (!strcasecmp(var, "odbc-dsn") && !zstr(val)) {
                if (switch_odbc_available() || switch_pgsql_available()) {
                    switch_set_string(globals.odbc_dsn, val);
                } else {
                    switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ODBC IS NOT AVAILABLE!\n");
                }
}
        }
    }
    return SWITCH_STATUS_SUCCESS;
}

/*
* 获取外呼任务数
*/
static void cctask_outbound_sql()
{
char outbound_count[80] = "";
char *sql;
callback_t cbt = { 0 };
cbt.buf = outbound_count;
cbt.len = sizeof(outbound_count);
//sql = switch_mprintf("select id, task_name, task_status, task_type, create_time, create_users, call_max, task_cortrol_type, task_override, task_day_start, task_day_end, gateway, exten, context, cid_name, cid_num, fifo_name ,company_code from uncall_task.ccpaas_task where task_status = 'run'");  
sql = switch_mprintf("select id, task_name, task_status, task_type, create_time, create_users, call_max, task_cortrol_type, task_override, task_day_start, task_day_end, gateway, exten, context, cid_name, cid_num, fifo_name ,company_code from uncall_task.ccpaas_task");
cctask_execute_sql_callback(globals.sql_mutex, sql, sql2str_callback, &cbt);
switch_safe_free(sql);
}

static switch_status_t parse_config(switch_bool_t reload)
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "parse config\n");
return SWITCH_STATUS_SUCCESS;
}

SWITCH_STANDARD_API(cctask_api_function){
stream->write_function(stream , "CCTASK 1.0 \n");
return SWITCH_STATUS_SUCCESS;
}


SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_cctask_shutdown){
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_LOAD_FUNCTION(mod_cctask_load){
//    switch_xml_t xml, cfg;
//    switch_status_t status = SWITCH_STATUS_SUCCESS; 
//    switch_cache_db_handle_t *dbh = NULL; 
switch_api_interface_t *api_interface;
strncpy(globals.hostname, switch_core_get_switchname(), sizeof(globals.hostname) - 1);

//    if ((status = read_config_file(&xml, &cfg)) != SWITCH_STATUS_SUCCESS) return status;
//
//    if (!(dbh = cctask_get_db_handle())) {
//            switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Cannot open DB!\n"); 
//    }     
/* connect my internal structure to the blank pointer passed to me */  
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "CC-PAAS \n");
//parse_config(SWITCH_FALSE);  
SWITCH_ADD_API(api_interface , "cctask" , "cctask of status", cctask_api_function, "[name]");
/* indicate that the module should continue to be loaded */  
return SWITCH_STATUS_SUCCESS;
}


//SWITCH_MODULE_RUNTIME_FUNCTION(mod_cctask_runtime){
////    while(1) {    
////        cctask_outbound_sql();
////        switch_yield(1000000);   
////    } 
//}
#define switch_set_string(_dst, _src) switch_copy_string(_dst, _src, sizeof(_dst))
#define switch_cache_db_get_db_handle_dsn(_a, _b) _switch_cache_db_get_db_handle_dsn(_a, _b, __FILE__, __SWITCH_FUNC__, __LINE__)