Page MenuHomeDevCentral

rabbitmq-tcl.c
No OneTemporary

rabbitmq-tcl.c

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* ___ __ \_____ ___ /____ /____(_)_ /___ |/ /_ __ \
* __ /_/ / __ `/_ __ \_ __ \_ /_ __/_ /|_/ /_ / / /
* _ _, _// /_/ /_ /_/ / /_/ / / / /_ _ / / / / /_/ /
* /_/ |_| \__,_/ /_.___//_.___//_/ \__/ /_/ /_/ \___\_\
* _____________________
* RabbitMQ C AMQP client library TCL wrapper ___ __/_ ____/__ /
* TCL module to connect to AMQP brokers. __ / _ / __ /
* _ / / /___ _ /___
* (c) 2015, Nasqueron, some rights reserved. /_/ \____/ /_____/
* Released under BSD-2-Clause license.
*
* Provides a TCL rabbitmq.so module to get a 'mq' TCL command.
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <stdlib.h>
#include <string.h>
#include <tcl.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "config.h"
#include "version.h"
#include "rabbitmq-tcl.h"
/* -------------------------------------------------------------
Broker connections
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
/**
* Represents a connection with the broker, and stores properties.
*/
struct broker_connection {
int connected; /* 0 if disconnected, 1 if connected */
amqp_connection_state_t connection;
} brokerConnections[MQ_COMMANDS_AMOUNT];
/* -------------------------------------------------------------
Helper functions
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
/**
* Gets product version string
*
* @return The product name and version
*/
char *get_version_string() {
char *versionString = malloc(256 * sizeof(char));
sprintf(versionString, "%s %s", RABBITMQ_TCL_PRODUCT_NAME,
RABBITMQ_TCL_VERSION);
return versionString;
}
/**
* Prints an error message as command result and notify TCL an error occured.
*
* @param[out] tclInterpreter The interpreter in which to set result
* @param[in] error The error message
* @return TCL_ERROR
*/
int tcl_error(Tcl_Interp *tclInterpreter, char *error) {
Tcl_SetResult(tclInterpreter, error, TCL_STATIC);
return TCL_ERROR;
}
/**
* Gets a broker error
*
* @param[in] connection The AMQP connection
* @param[out] rcpReply The AMQP RPC reply
* @return 1 if an error occured, 0 if not
*/
int amqp_get_error(amqp_connection_state_t connection,
amqp_rpc_reply_t *rpcReply) {
amqp_rpc_reply_t reply = amqp_get_rpc_reply(connection);
rpcReply = &reply;
return reply.reply_type != AMQP_RESPONSE_NORMAL;
}
/**
* Determines the AMQP error from the server RPC reply, prints an error message
* based on this error and the specified context, then notify TCL an error
*occured.
*
* @param[out] tclInterpreter The interpreter in which to set result
* @param[in] errorContext The context of the error message, typically what were
*done at the moment of the error
* @param[in] rcpReply The AMQP RPC reply
* @return TCL_ERROR
*/
int tcl_amqp_error(Tcl_Interp *tclInterpreter, const char *errorContext,
amqp_rpc_reply_t rpcReply) {
char *error;
if (rpcReply.reply_type == AMQP_RESPONSE_NORMAL) {
// Not an error
return TCL_OK;
}
error = malloc(1024 * sizeof(char));
if (rpcReply.reply_type == AMQP_RESPONSE_NONE) {
sprintf(error, "%s a broker error occurred, but with an unexpected RPC "
"reply type. Please report this bug as issue.",
errorContext);
} else if (rpcReply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
sprintf(error, "%s %s", errorContext,
amqp_error_string2(rpcReply.library_error));
} else if (rpcReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
if (rpcReply.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m =
(amqp_connection_close_t *)rpcReply.reply.decoded;
sprintf(error,
"%s a server connection error %d occurred, message: %.*s",
errorContext, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
} else if (rpcReply.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
amqp_channel_close_t *m =
(amqp_channel_close_t *)rpcReply.reply.decoded;
sprintf(error, "%s a server channel error %d occurred: %.*s",
errorContext, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
} else {
sprintf(error,
"%s an unknown server error occurred, method id 0x%08X",
errorContext, rpcReply.reply.id);
}
}
return tcl_error(tclInterpreter, error);
}
/* -------------------------------------------------------------
TCL commands
mq
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
/**
* mq command
*
* @param[in] connectionNumber The connection offset (0 for mq, 1 for mq1, …)
* @param[out] tclInterpreter The interpreter in which to create new command
* @param[in] argc The amount of command arguments
* @param[in] argv The command arguments
* @return TCL_OK, TCL_ERROR, TCL_RETURN, TCL_BREAK or TCL_CONTINUE
*/
static int mq_command(ClientData clientData, Tcl_Interp *tclInterpreter,
int argc, char **argv) {
if (argc <= 1) {
return mq_usage(tclInterpreter);
}
int connectionNumber = (int)clientData;
if (strcmp(argv[1], "connect") == 0) {
return mq_connect(connectionNumber, tclInterpreter, argc - 2, argv + 2);
} else if (strcmp(argv[1], "disconnect") == 0) {
return mq_disconnect(connectionNumber, tclInterpreter);
} else if (strcmp(argv[1], "version") == 0) {
return mq_version(tclInterpreter);
} else {
return mq_usage(tclInterpreter);
}
}
/**
* mq [not existing command] usage output
*
* @param[out] tclInterpreter The interpreter to send command result to
*/
int mq_usage(Tcl_Interp *tclInterpreter) {
return tcl_error(tclInterpreter, "Usage: mq <connect|disconnect|version>");
}
/**
* mq version
*
* @param[out] tclInterpreter The interpreter in which to create new command
* @return TCL_OK
*/
int mq_version(Tcl_Interp *tclInterpreter) {
Tcl_SetResult(tclInterpreter, get_version_string(), TCL_STATIC);
return TCL_OK;
}
/**
* mq connect
*
* @param[in] connectionNumber The connection offset (0 for mq, 1 for mq1, …)
* @param[out] tclInterpreter The interpreter in which to create new command
* @param[in] argc The amount of command arguments
* @param[in] argv The command arguments
* @return TCL_OK on success, TCL_ERROR if already connected or can't connect
*/
int mq_connect(int connectionNumber, Tcl_Interp *tclInterpreter, int argc,
char **argv) {
char *host, *user, *pass, *vhost;
int port, status;
amqp_connection_state_t conn;
amqp_socket_t *socket;
amqp_rpc_reply_t result;
// We don't allow to reconnect without first used mq disconnect
if (brokerConnections[connectionNumber].connected == 1) {
return tcl_error(tclInterpreter, "Already connected.");
}
// Connection parameters
if (argc > 0 && argv[0]) {
host = get_host(argv[0], BROKER_HOST);
port = get_port(argv[0], BROKER_PORT);
} else {
host = BROKER_HOST;
port = BROKER_PORT;
}
if (argc > 1 && argv[1]) {
user = argv[1];
} else {
user = BROKER_USER;
}
if (argc > 2 && argv[2]) {
pass = argv[2];
} else {
pass = BROKER_PASS;
}
if (argc > 3 && argv[3]) {
vhost = argv[3];
} else {
vhost = BROKER_VHOST;
}
// Opens a TCP connection to the broker
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
return tcl_error(tclInterpreter, "Can't create TCP socket.");
}
status = amqp_socket_open(socket, host, port);
if (status) {
return tcl_error(tclInterpreter, "Can't connect to the broker.");
}
// Logins to the broker
// No heartbeat, unlimited channels, 128K (131072) frame size
result = amqp_login(conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user,
pass);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return tcl_error(tclInterpreter, "Can't login to the broker.");
}
// Open a first channel
amqp_channel_open(conn, 1);
if (amqp_get_error(conn, &result)) {
return tcl_amqp_error(tclInterpreter, "Can't open a channel:", result);
}
// We're connected. All is good.
brokerConnections[connectionNumber].connection = conn;
brokerConnections[connectionNumber].connected = 1;
return TCL_OK;
}
/**
* mq disconnect
*
* @param[in] connectionNumber The connection offset (0 for mq, 1 for mq1, …)
* @param[out] tclInetrpreter The interpreter in which to create new command
* @return TCL_OK on success, TCL_ERROR if not connected
*/
int mq_disconnect(int connectionNumber, Tcl_Interp *tclInterpreter) {
amqp_rpc_reply_t result;
if (brokerConnections[connectionNumber].connected == 0) {
return tcl_error(tclInterpreter, "Not connected.");
}
// We mark early as disconnected, to allow to recycle the slot
// event if an error occurs during disconnect.
brokerConnections[connectionNumber].connected = 0;
amqp_connection_state_t conn =
brokerConnections[connectionNumber].connection;
result = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return tcl_amqp_error(tclInterpreter,
"An error occured closing channel:", result);
}
result = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
if (result.reply_type != AMQP_RESPONSE_NORMAL) {
return tcl_amqp_error(tclInterpreter,
"An error occured closing connection:", result);
}
amqp_destroy_connection(conn);
return TCL_OK;
}
/* -------------------------------------------------------------
TCL initialisation code
Creation of the mq and mq2-10 commands.
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
#ifdef WINDOWS
extern __declspec(dllexport) int mq_init(Tcl_Interp *tclInterpreter);
#else
extern int mq_init(Tcl_Interp *tclInterpreter);
#endif
/**
* Initializes the library and provide mq commands
*
* @param[out] tclInterpreter The current TCL interpreter to provide command to
* @return TCL_OK on success; otherwise, TCL_ERROR
*/
int Rabbitmq_Init(Tcl_Interp *tclInterpreter) {
int i, result;
char commandName[10];
// TCL requirements and stubs
if (Tcl_PkgRequire(tclInterpreter, "Tcl", TCL_VERSION, 0) == NULL) {
if (TCL_VERSION[0] == '7') {
if (Tcl_PkgRequire(tclInterpreter, "Tcl", "8.0", 0) == NULL) {
return TCL_ERROR;
}
}
}
#if USE_TCL_STUBS
if (Tcl_InitStubs(tclInterpreter, "8.1", 0) == NULL) {
return TCL_ERROR;
}
#endif
// Creates mq and mq1-mq10 commands
for (i = 0; i <= MQ_COMMANDS_AMOUNT; i++) {
if (i == 0) {
strcpy(commandName, "mq");
} else {
sprintf(commandName, "mq%d", i);
}
Tcl_CreateCommand(tclInterpreter, commandName, mq_command,
(ClientData)i, (Tcl_CmdDeleteProc *)NULL);
brokerConnections[i].connected = 0;
}
// Provides a TCL package
result = Tcl_PkgProvide(tclInterpreter, "rabbitmq", RABBITMQ_TCL_VERSION);
if (result == TCL_ERROR) {
return TCL_ERROR;
}
return TCL_OK;
}

File Metadata

Mime Type
text/x-c
Expires
Sat, Mar 7, 02:06 (1 d, 2 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3500253
Default Alt Text
rabbitmq-tcl.c (11 KB)

Event Timeline