Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
trackrdrd
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
uplex-varnish
trackrdrd
Commits
6e631b41
Commit
6e631b41
authored
May 23, 2014
by
Geoff Simmons
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
trackrdrd: extended the MQ interface to allow the use of shard keys
(but the ActiveMQ implementation does not use sharding)
parent
e53b33b0
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
26 additions
and
12 deletions
+26
-12
mq.h
trackrdrd/include/mq.h
+4
-1
Makefile.am
trackrdrd/src/mq/activemq/Makefile.am
+1
-1
README.rst
trackrdrd/src/mq/activemq/README.rst
+3
-0
mq.c
trackrdrd/src/mq/activemq/mq.c
+6
-1
test_activemq.c
trackrdrd/src/mq/activemq/test/test_activemq.c
+3
-3
test_mq.c
trackrdrd/src/test/test_mq.c
+3
-3
trackrdrd.h
trackrdrd/src/trackrdrd.h
+2
-1
worker.c
trackrdrd/src/worker.c
+4
-2
No files found.
trackrdrd/include/mq.h
View file @
6e631b41
...
@@ -140,9 +140,12 @@ const char *MQ_WorkerInit(void **priv);
...
@@ -140,9 +140,12 @@ const char *MQ_WorkerInit(void **priv);
* @param priv private object handle
* @param priv private object handle
* @param data pointer to the data to be sent
* @param data pointer to the data to be sent
* @param len length of the data in bytes
* @param len length of the data in bytes
* @param key an optional sharding key for the messaging system
* @param keylen length of the sharding key
* @return `NULL` on success, an error message on failure
* @return `NULL` on success, an error message on failure
*/
*/
const
char
*
MQ_Send
(
void
*
priv
,
const
char
*
data
,
unsigned
len
);
const
char
*
MQ_Send
(
void
*
priv
,
const
char
*
data
,
unsigned
len
,
const
char
*
key
,
unsigned
keylen
);
/**
/**
* Return the version string of the messaging system.
* Return the version string of the messaging system.
...
...
trackrdrd/src/mq/activemq/Makefile.am
View file @
6e631b41
...
@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
...
@@ -19,7 +19,7 @@ libtrackrdr_activemq_la_LIBADD = \
@APR_LIBS@
\
@APR_LIBS@
\
@APU_LIBS@
@APU_LIBS@
libtrackrdr_activemq_la_LDFLAGS
=
-version-info
1
:0:0
libtrackrdr_activemq_la_LDFLAGS
=
-version-info
2
:0:0
if
HAVE_RST2MAN
if
HAVE_RST2MAN
dist_man_MANS
=
libtrackrdr-activemq.3
dist_man_MANS
=
libtrackrdr-activemq.3
...
...
trackrdrd/src/mq/activemq/README.rst
View file @
6e631b41
...
@@ -37,6 +37,9 @@ path is specified as ``mq.config_fname`` in the configuration of
...
@@ -37,6 +37,9 @@ path is specified as ``mq.config_fname`` in the configuration of
``libactivemq-cpp``. The dynamic linker must also be able to find
``libactivemq-cpp``. The dynamic linker must also be able to find
``libactivemq-cpp.so`` at runtime.
``libactivemq-cpp.so`` at runtime.
This implementation does not use sharding keys; the key data in the
call to ``MQ_Send()`` are silently discarded.
BUILD/INSTALL
BUILD/INSTALL
=============
=============
...
...
trackrdrd/src/mq/activemq/mq.c
View file @
6e631b41
...
@@ -140,8 +140,13 @@ MQ_WorkerInit(void **priv)
...
@@ -140,8 +140,13 @@ MQ_WorkerInit(void **priv)
}
}
const
char
*
const
char
*
MQ_Send
(
void
*
priv
,
const
char
*
data
,
unsigned
len
)
MQ_Send
(
void
*
priv
,
const
char
*
data
,
unsigned
len
,
const
char
*
key
,
unsigned
keylen
)
{
{
/* The ActiveMQ implementation does not use sharding. */
(
void
)
key
;
(
void
)
keylen
;
return
AMQ_Send
((
AMQ_Worker
*
)
priv
,
data
,
len
);
return
AMQ_Send
((
AMQ_Worker
*
)
priv
,
data
,
len
);
}
}
...
...
trackrdrd/src/mq/activemq/test/test_activemq.c
View file @
6e631b41
...
@@ -129,7 +129,7 @@ static const char
...
@@ -129,7 +129,7 @@ static const char
printf
(
"... testing ActiveMQ message send
\n
"
);
printf
(
"... testing ActiveMQ message send
\n
"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Send: worker is NULL before call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Send: worker is NULL before call"
);
err
=
MQ_Send
(
worker
,
"foo bar baz quux"
,
16
);
err
=
MQ_Send
(
worker
,
"foo bar baz quux"
,
16
,
"key"
,
3
);
VMASSERT
(
err
==
NULL
,
"MQ_Send: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Send: %s"
,
err
);
return
NULL
;
return
NULL
;
...
@@ -146,7 +146,7 @@ static const char
...
@@ -146,7 +146,7 @@ static const char
err
=
MQ_Reconnect
(
&
worker
);
err
=
MQ_Reconnect
(
&
worker
);
VMASSERT
(
err
==
NULL
,
"MQ_Reconnect: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Reconnect: %s"
,
err
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Reconnect: worker is NULL after call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Reconnect: worker is NULL after call"
);
err
=
MQ_Send
(
worker
,
"send after reconnect"
,
20
);
err
=
MQ_Send
(
worker
,
"send after reconnect"
,
20
,
"key"
,
3
);
VMASSERT
(
err
==
NULL
,
"MQ_Send() fails after reconnect: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Send() fails after reconnect: %s"
,
err
);
return
NULL
;
return
NULL
;
...
@@ -165,7 +165,7 @@ static const char
...
@@ -165,7 +165,7 @@ static const char
MASSERT0
(
worker
==
NULL
,
"Worker not NULL after shutdown"
);
MASSERT0
(
worker
==
NULL
,
"Worker not NULL after shutdown"
);
err
=
MQ_Send
(
worker
,
"foo bar baz quux"
,
16
);
err
=
MQ_Send
(
worker
,
"foo bar baz quux"
,
16
,
"key"
,
3
);
MASSERT0
(
err
!=
NULL
,
"No failure on MQ_Send after worker shutdown"
);
MASSERT0
(
err
!=
NULL
,
"No failure on MQ_Send after worker shutdown"
);
return
NULL
;
return
NULL
;
...
...
trackrdrd/src/test/test_mq.c
View file @
6e631b41
...
@@ -172,7 +172,7 @@ static const char
...
@@ -172,7 +172,7 @@ static const char
printf
(
"... testing message send
\n
"
);
printf
(
"... testing message send
\n
"
);
mu_assert
(
"MQ_Send: worker is NULL before call"
,
worker
!=
NULL
);
mu_assert
(
"MQ_Send: worker is NULL before call"
,
worker
!=
NULL
);
err
=
mqf
.
send
(
worker
,
"foo bar baz quux"
,
16
);
err
=
mqf
.
send
(
worker
,
"foo bar baz quux"
,
16
,
"key"
,
3
);
sprintf
(
errmsg
,
"MQ_Send: %s"
,
err
);
sprintf
(
errmsg
,
"MQ_Send: %s"
,
err
);
mu_assert
(
errmsg
,
err
==
NULL
);
mu_assert
(
errmsg
,
err
==
NULL
);
...
@@ -190,7 +190,7 @@ static const char
...
@@ -190,7 +190,7 @@ static const char
err
=
mqf
.
reconnect
(
&
worker
);
err
=
mqf
.
reconnect
(
&
worker
);
VMASSERT
(
err
==
NULL
,
"MQ_Reconnect: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Reconnect: %s"
,
err
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Reconnect: worker is NULL after call"
);
MASSERT0
(
worker
!=
NULL
,
"MQ_Reconnect: worker is NULL after call"
);
err
=
mqf
.
send
(
worker
,
"send after reconnect"
,
20
);
err
=
mqf
.
send
(
worker
,
"send after reconnect"
,
20
,
"key"
,
3
);
VMASSERT
(
err
==
NULL
,
"MQ_Send() fails after reconnect: %s"
,
err
);
VMASSERT
(
err
==
NULL
,
"MQ_Send() fails after reconnect: %s"
,
err
);
return
NULL
;
return
NULL
;
...
@@ -210,7 +210,7 @@ static const char
...
@@ -210,7 +210,7 @@ static const char
mu_assert
(
"Worker not NULL after shutdown"
,
worker
==
NULL
);
mu_assert
(
"Worker not NULL after shutdown"
,
worker
==
NULL
);
err
=
mqf
.
send
(
worker
,
"foo bar baz quux"
,
16
);
err
=
mqf
.
send
(
worker
,
"foo bar baz quux"
,
16
,
"key"
,
3
);
mu_assert
(
"No failure on MQ_Send after worker shutdown"
,
err
!=
NULL
);
mu_assert
(
"No failure on MQ_Send after worker shutdown"
,
err
!=
NULL
);
return
NULL
;
return
NULL
;
...
...
trackrdrd/src/trackrdrd.h
View file @
6e631b41
...
@@ -44,7 +44,8 @@
...
@@ -44,7 +44,8 @@
typedef
const
char
*
global_init_f
(
unsigned
nworkers
,
const
char
*
config_fname
);
typedef
const
char
*
global_init_f
(
unsigned
nworkers
,
const
char
*
config_fname
);
typedef
const
char
*
init_connections_f
(
void
);
typedef
const
char
*
init_connections_f
(
void
);
typedef
const
char
*
worker_init_f
(
void
**
priv
);
typedef
const
char
*
worker_init_f
(
void
**
priv
);
typedef
const
char
*
send_f
(
void
*
priv
,
const
char
*
data
,
unsigned
len
);
typedef
const
char
*
send_f
(
void
*
priv
,
const
char
*
data
,
unsigned
len
,
const
char
*
key
,
unsigned
keylen
);
typedef
const
char
*
version_f
(
void
*
priv
,
char
*
version
);
typedef
const
char
*
version_f
(
void
*
priv
,
char
*
version
);
typedef
const
char
*
client_id_f
(
void
*
priv
,
char
*
clientID
);
typedef
const
char
*
client_id_f
(
void
*
priv
,
char
*
clientID
);
typedef
const
char
*
reconnect_f
(
void
**
priv
);
typedef
const
char
*
reconnect_f
(
void
**
priv
);
...
...
trackrdrd/src/worker.c
View file @
6e631b41
...
@@ -127,7 +127,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
...
@@ -127,7 +127,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
AN
(
amq_worker
);
AN
(
amq_worker
);
/* XXX: report entry->incomplete to backend ? */
/* XXX: report entry->incomplete to backend ? */
err
=
mqf
.
send
(
*
amq_worker
,
entry
->
data
,
entry
->
end
);
err
=
mqf
.
send
(
*
amq_worker
,
entry
->
data
,
entry
->
end
,
entry
->
key
,
entry
->
keylen
);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
LOG_Log
(
LOG_WARNING
,
"Worker %d: Failed to send data: %s"
,
LOG_Log
(
LOG_WARNING
,
"Worker %d: Failed to send data: %s"
,
wrk
->
id
,
err
);
wrk
->
id
,
err
);
...
@@ -144,7 +145,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
...
@@ -144,7 +145,8 @@ wrk_send(void **amq_worker, dataentry *entry, worker_data_t *wrk)
wrk
->
reconnects
++
;
wrk
->
reconnects
++
;
wrk_log_connection
(
*
amq_worker
,
wrk
->
id
);
wrk_log_connection
(
*
amq_worker
,
wrk
->
id
);
MON_StatsUpdate
(
STATS_RECONNECT
);
MON_StatsUpdate
(
STATS_RECONNECT
);
err
=
mqf
.
send
(
*
amq_worker
,
entry
->
data
,
entry
->
end
);
err
=
mqf
.
send
(
*
amq_worker
,
entry
->
data
,
entry
->
end
,
entry
->
key
,
entry
->
keylen
);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
wrk
->
fails
++
;
wrk
->
fails
++
;
wrk
->
status
=
EXIT_FAILURE
;
wrk
->
status
=
EXIT_FAILURE
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment