Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F24682914
rabbitmq-tcl.c
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Referenced Files
None
Subscribers
None
rabbitmq-tcl.c
View Options
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
*
* ___ __ \_____ ___ /____ /____(_)_ /___ |/ /_ __ \
* __ /_/ / __ `/_ __ \_ __ \_ /_ __/_ /|_/ /_ / / /
* _ _, _// /_/ /_ /_/ / /_/ / / / /_ _ / / / / /_/ /
* /_/ |_| \__,_/ /_.___//_.___//_/ \__/ /_/ /_/ \___\_\
* _____________________
* RabbitMQ C AMQP client library TCL wrapper ___ __/_ ____/__ /
* TCL module to connect to AMQP brokers. __ / _ / __ /
* _ / / /___ _ /___
* (c) 2015, Nasqueron, some rights reserved. /_/ \____/ /_____/
* Released under BSD-2-Clause license.
*
* Provides a TCL rabbitmq.so module to get a 'mq' TCL command.
*
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
#include
<stdlib.h>
#include
<string.h>
#include
<tcl.h>
#include
<amqp_tcp_socket.h>
#include
<amqp.h>
#include
<amqp_framing.h>
#include
"config.h"
#include
"version.h"
#include
"rabbitmq-tcl.h"
/* -------------------------------------------------------------
Broker connections
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
/**
* Represents a connection with the broker, and stores properties.
*/
struct
broker_connection
{
int
connected
;
/* 0 if disconnected, 1 if connected */
amqp_connection_state_t
connection
;
}
brokerConnections
[
MQ_COMMANDS_AMOUNT
];
/* -------------------------------------------------------------
Helper functions
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
/**
* Gets product version string
*
* @return The product name and version
*/
char
*
get_version_string
()
{
char
*
versionString
=
malloc
(
256
*
sizeof
(
char
));
sprintf
(
versionString
,
"%s %s"
,
RABBITMQ_TCL_PRODUCT_NAME
,
RABBITMQ_TCL_VERSION
);
return
versionString
;
}
/**
* Prints an error message as command result and notify TCL an error occured.
*
* @param[out] tclInterpreter The interpreter in which to set result
* @param[in] error The error message
* @return TCL_ERROR
*/
int
tcl_error
(
Tcl_Interp
*
tclInterpreter
,
char
*
error
)
{
Tcl_SetResult
(
tclInterpreter
,
error
,
TCL_STATIC
);
return
TCL_ERROR
;
}
/**
* Gets a broker error
*
* @param[in] connection The AMQP connection
* @param[out] rcpReply The AMQP RPC reply
* @return 1 if an error occured, 0 if not
*/
int
amqp_get_error
(
amqp_connection_state_t
connection
,
amqp_rpc_reply_t
*
rpcReply
)
{
amqp_rpc_reply_t
reply
=
amqp_get_rpc_reply
(
connection
);
rpcReply
=
&
reply
;
return
reply
.
reply_type
!=
AMQP_RESPONSE_NORMAL
;
}
/**
* Determines the AMQP error from the server RPC reply, prints an error message
* based on this error and the specified context, then notify TCL an error
*occured.
*
* @param[out] tclInterpreter The interpreter in which to set result
* @param[in] errorContext The context of the error message, typically what were
*done at the moment of the error
* @param[in] rcpReply The AMQP RPC reply
* @return TCL_ERROR
*/
int
tcl_amqp_error
(
Tcl_Interp
*
tclInterpreter
,
const
char
*
errorContext
,
amqp_rpc_reply_t
rpcReply
)
{
char
*
error
;
if
(
rpcReply
.
reply_type
==
AMQP_RESPONSE_NORMAL
)
{
// Not an error
return
TCL_OK
;
}
error
=
malloc
(
1024
*
sizeof
(
char
));
if
(
rpcReply
.
reply_type
==
AMQP_RESPONSE_NONE
)
{
sprintf
(
error
,
"%s a broker error occurred, but with an unexpected RPC "
"reply type. Please report this bug as issue."
,
errorContext
);
}
else
if
(
rpcReply
.
reply_type
==
AMQP_RESPONSE_LIBRARY_EXCEPTION
)
{
sprintf
(
error
,
"%s %s"
,
errorContext
,
amqp_error_string2
(
rpcReply
.
library_error
));
}
else
if
(
rpcReply
.
reply_type
==
AMQP_RESPONSE_SERVER_EXCEPTION
)
{
if
(
rpcReply
.
reply
.
id
==
AMQP_CONNECTION_CLOSE_METHOD
)
{
amqp_connection_close_t
*
m
=
(
amqp_connection_close_t
*
)
rpcReply
.
reply
.
decoded
;
sprintf
(
error
,
"%s a server connection error %d occurred, message: %.*s"
,
errorContext
,
m
->
reply_code
,
(
int
)
m
->
reply_text
.
len
,
(
char
*
)
m
->
reply_text
.
bytes
);
}
else
if
(
rpcReply
.
reply
.
id
==
AMQP_CHANNEL_CLOSE_METHOD
)
{
amqp_channel_close_t
*
m
=
(
amqp_channel_close_t
*
)
rpcReply
.
reply
.
decoded
;
sprintf
(
error
,
"%s a server channel error %d occurred: %.*s"
,
errorContext
,
m
->
reply_code
,
(
int
)
m
->
reply_text
.
len
,
(
char
*
)
m
->
reply_text
.
bytes
);
}
else
{
sprintf
(
error
,
"%s an unknown server error occurred, method id 0x%08X"
,
errorContext
,
rpcReply
.
reply
.
id
);
}
}
return
tcl_error
(
tclInterpreter
,
error
);
}
/* -------------------------------------------------------------
TCL commands
mq
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
/**
* mq command
*
* @param[in] connectionNumber The connection offset (0 for mq, 1 for mq1, …)
* @param[out] tclInterpreter The interpreter in which to create new command
* @param[in] argc The amount of command arguments
* @param[in] argv The command arguments
* @return TCL_OK, TCL_ERROR, TCL_RETURN, TCL_BREAK or TCL_CONTINUE
*/
static
int
mq_command
(
ClientData
clientData
,
Tcl_Interp
*
tclInterpreter
,
int
argc
,
char
**
argv
)
{
if
(
argc
<=
1
)
{
return
mq_usage
(
tclInterpreter
);
}
int
connectionNumber
=
(
int
)
clientData
;
if
(
strcmp
(
argv
[
1
],
"connect"
)
==
0
)
{
return
mq_connect
(
connectionNumber
,
tclInterpreter
,
argc
-
2
,
argv
+
2
);
}
else
if
(
strcmp
(
argv
[
1
],
"disconnect"
)
==
0
)
{
return
mq_disconnect
(
connectionNumber
,
tclInterpreter
);
}
else
if
(
strcmp
(
argv
[
1
],
"version"
)
==
0
)
{
return
mq_version
(
tclInterpreter
);
}
else
{
return
mq_usage
(
tclInterpreter
);
}
}
/**
* mq [not existing command] usage output
*
* @param[out] tclInterpreter The interpreter to send command result to
*/
int
mq_usage
(
Tcl_Interp
*
tclInterpreter
)
{
return
tcl_error
(
tclInterpreter
,
"Usage: mq <connect|disconnect|version>"
);
}
/**
* mq version
*
* @param[out] tclInterpreter The interpreter in which to create new command
* @return TCL_OK
*/
int
mq_version
(
Tcl_Interp
*
tclInterpreter
)
{
Tcl_SetResult
(
tclInterpreter
,
get_version_string
(),
TCL_STATIC
);
return
TCL_OK
;
}
/**
* mq connect
*
* @param[in] connectionNumber The connection offset (0 for mq, 1 for mq1, …)
* @param[out] tclInterpreter The interpreter in which to create new command
* @param[in] argc The amount of command arguments
* @param[in] argv The command arguments
* @return TCL_OK on success, TCL_ERROR if already connected or can't connect
*/
int
mq_connect
(
int
connectionNumber
,
Tcl_Interp
*
tclInterpreter
,
int
argc
,
char
**
argv
)
{
char
*
host
,
*
user
,
*
pass
,
*
vhost
;
int
port
,
status
;
amqp_connection_state_t
conn
;
amqp_socket_t
*
socket
;
amqp_rpc_reply_t
result
;
// We don't allow to reconnect without first used mq disconnect
if
(
brokerConnections
[
connectionNumber
].
connected
==
1
)
{
return
tcl_error
(
tclInterpreter
,
"Already connected."
);
}
// Connection parameters
if
(
argc
>
0
&&
argv
[
0
])
{
host
=
get_host
(
argv
[
0
],
BROKER_HOST
);
port
=
get_port
(
argv
[
0
],
BROKER_PORT
);
}
else
{
host
=
BROKER_HOST
;
port
=
BROKER_PORT
;
}
if
(
argc
>
1
&&
argv
[
1
])
{
user
=
argv
[
1
];
}
else
{
user
=
BROKER_USER
;
}
if
(
argc
>
2
&&
argv
[
2
])
{
pass
=
argv
[
2
];
}
else
{
pass
=
BROKER_PASS
;
}
if
(
argc
>
3
&&
argv
[
3
])
{
vhost
=
argv
[
3
];
}
else
{
vhost
=
BROKER_VHOST
;
}
// Opens a TCP connection to the broker
conn
=
amqp_new_connection
();
socket
=
amqp_tcp_socket_new
(
conn
);
if
(
!
socket
)
{
return
tcl_error
(
tclInterpreter
,
"Can't create TCP socket."
);
}
status
=
amqp_socket_open
(
socket
,
host
,
port
);
if
(
status
)
{
return
tcl_error
(
tclInterpreter
,
"Can't connect to the broker."
);
}
// Logins to the broker
// No heartbeat, unlimited channels, 128K (131072) frame size
result
=
amqp_login
(
conn
,
vhost
,
0
,
131072
,
0
,
AMQP_SASL_METHOD_PLAIN
,
user
,
pass
);
if
(
result
.
reply_type
!=
AMQP_RESPONSE_NORMAL
)
{
return
tcl_error
(
tclInterpreter
,
"Can't login to the broker."
);
}
// Open a first channel
amqp_channel_open
(
conn
,
1
);
if
(
amqp_get_error
(
conn
,
&
result
))
{
return
tcl_amqp_error
(
tclInterpreter
,
"Can't open a channel:"
,
result
);
}
// We're connected. All is good.
brokerConnections
[
connectionNumber
].
connection
=
conn
;
brokerConnections
[
connectionNumber
].
connected
=
1
;
return
TCL_OK
;
}
/**
* mq disconnect
*
* @param[in] connectionNumber The connection offset (0 for mq, 1 for mq1, …)
* @param[out] tclInetrpreter The interpreter in which to create new command
* @return TCL_OK on success, TCL_ERROR if not connected
*/
int
mq_disconnect
(
int
connectionNumber
,
Tcl_Interp
*
tclInterpreter
)
{
amqp_rpc_reply_t
result
;
if
(
brokerConnections
[
connectionNumber
].
connected
==
0
)
{
return
tcl_error
(
tclInterpreter
,
"Not connected."
);
}
// We mark early as disconnected, to allow to recycle the slot
// event if an error occurs during disconnect.
brokerConnections
[
connectionNumber
].
connected
=
0
;
amqp_connection_state_t
conn
=
brokerConnections
[
connectionNumber
].
connection
;
result
=
amqp_channel_close
(
conn
,
1
,
AMQP_REPLY_SUCCESS
);
if
(
result
.
reply_type
!=
AMQP_RESPONSE_NORMAL
)
{
return
tcl_amqp_error
(
tclInterpreter
,
"An error occured closing channel:"
,
result
);
}
result
=
amqp_connection_close
(
conn
,
AMQP_REPLY_SUCCESS
);
if
(
result
.
reply_type
!=
AMQP_RESPONSE_NORMAL
)
{
return
tcl_amqp_error
(
tclInterpreter
,
"An error occured closing connection:"
,
result
);
}
amqp_destroy_connection
(
conn
);
return
TCL_OK
;
}
/* -------------------------------------------------------------
TCL initialisation code
Creation of the mq and mq2-10 commands.
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
#ifdef WINDOWS
extern
__declspec
(
dllexport
)
int
mq_init
(
Tcl_Interp
*
tclInterpreter
);
#else
extern
int
mq_init
(
Tcl_Interp
*
tclInterpreter
);
#endif
/**
* Initializes the library and provide mq commands
*
* @param[out] tclInterpreter The current TCL interpreter to provide command to
* @return TCL_OK on success; otherwise, TCL_ERROR
*/
int
Rabbitmq_Init
(
Tcl_Interp
*
tclInterpreter
)
{
int
i
,
result
;
char
commandName
[
10
];
// TCL requirements and stubs
if
(
Tcl_PkgRequire
(
tclInterpreter
,
"Tcl"
,
TCL_VERSION
,
0
)
==
NULL
)
{
if
(
TCL_VERSION
[
0
]
==
'7'
)
{
if
(
Tcl_PkgRequire
(
tclInterpreter
,
"Tcl"
,
"8.0"
,
0
)
==
NULL
)
{
return
TCL_ERROR
;
}
}
}
#if USE_TCL_STUBS
if
(
Tcl_InitStubs
(
tclInterpreter
,
"8.1"
,
0
)
==
NULL
)
{
return
TCL_ERROR
;
}
#endif
// Creates mq and mq1-mq10 commands
for
(
i
=
0
;
i
<=
MQ_COMMANDS_AMOUNT
;
i
++
)
{
if
(
i
==
0
)
{
strcpy
(
commandName
,
"mq"
);
}
else
{
sprintf
(
commandName
,
"mq%d"
,
i
);
}
Tcl_CreateCommand
(
tclInterpreter
,
commandName
,
mq_command
,
(
ClientData
)
i
,
(
Tcl_CmdDeleteProc
*
)
NULL
);
brokerConnections
[
i
].
connected
=
0
;
}
// Provides a TCL package
result
=
Tcl_PkgProvide
(
tclInterpreter
,
"rabbitmq"
,
RABBITMQ_TCL_VERSION
);
if
(
result
==
TCL_ERROR
)
{
return
TCL_ERROR
;
}
return
TCL_OK
;
}
File Metadata
Details
Attached
Mime Type
text/x-c
Expires
Sat, Mar 7, 02:06 (1 d, 2 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3500253
Default Alt Text
rabbitmq-tcl.c (11 KB)
Attached To
Mode
rRABBITMQTCL RabbitMQ TCL extension
Attached
Detach File
Event Timeline
Log In to Comment